1 /**********************************************************************
2
3 thread.c -
4
5 $Author: naruse $
6
7 Copyright (C) 2004-2007 Koichi Sasada
8
9 **********************************************************************/
10
11 /*
12 YARV Thread Design
13
14 model 1: Userlevel Thread
15 Same as traditional ruby thread.
16
17 model 2: Native Thread with Global VM lock
18 Using pthread (or Windows thread) and Ruby threads run concurrent.
19
20 model 3: Native Thread with fine grain lock
21 Using pthread and Ruby threads run concurrent or parallel.
22
23 model 4: M:N User:Native threads with Global VM lock
24 Combination of model 1 and 2
25
26 model 5: M:N User:Native thread with fine grain lock
27 Combination of model 1 and 3
28
29 ------------------------------------------------------------------------
30
31 model 2:
32 A thread has mutex (GVL: Global VM Lock or Giant VM Lock) can run.
33 When thread scheduling, running thread release GVL. If running thread
34 try blocking operation, this thread must release GVL and another
35 thread can continue this flow. After blocking operation, thread
36 must check interrupt (RUBY_VM_CHECK_INTS).
37
38 Every VM can run parallel.
39
40 Ruby threads are scheduled by OS thread scheduler.
41
42 ------------------------------------------------------------------------
43
44 model 3:
45 Every threads run concurrent or parallel and to access shared object
46 exclusive access control is needed. For example, to access String
47 object or Array object, fine grain lock must be locked every time.
48 */
49
50
51 /*
52 * FD_SET, FD_CLR and FD_ISSET have a small sanity check when using glibc
53 * 2.15 or later and set _FORTIFY_SOURCE > 0.
54 * However, the implementation is wrong. Even though Linux's select(2)
55 * supports large fd size (>FD_SETSIZE), it wrongly assumes fd is always
56 * less than FD_SETSIZE (i.e. 1024). And then when enabling HAVE_RB_FD_INIT,
57 * it doesn't work correctly and makes program abort. Therefore we need to
58 * disable FORTIFY_SOURCE until glibc fixes it.
59 */
60 #undef _FORTIFY_SOURCE
61 #undef __USE_FORTIFY_LEVEL
62 #define __USE_FORTIFY_LEVEL 0
63
64 /* for model 2 */
65
66 #include "ruby/config.h"
67 #include "ruby/io.h"
68 #include "eval_intern.h"
69 #include "gc.h"
70 #include "timev.h"
71 #include "ruby/thread.h"
72 #include "ruby/thread_native.h"
73 #include "ruby/debug.h"
74 #include "internal.h"
75 #include "iseq.h"
76 #include "vm_core.h"
77 #include "mjit.h"
78 #include "hrtime.h"
79
80 #ifndef USE_NATIVE_THREAD_PRIORITY
81 #define USE_NATIVE_THREAD_PRIORITY 0
82 #define RUBY_THREAD_PRIORITY_MAX 3
83 #define RUBY_THREAD_PRIORITY_MIN -3
84 #endif
85
86 #ifndef THREAD_DEBUG
87 #define THREAD_DEBUG 0
88 #endif
89
90 static VALUE rb_cThreadShield;
91
92 static VALUE sym_immediate;
93 static VALUE sym_on_blocking;
94 static VALUE sym_never;
95 static ID id_locals;
96
97 enum SLEEP_FLAGS {
98 SLEEP_DEADLOCKABLE = 0x1,
99 SLEEP_SPURIOUS_CHECK = 0x2
100 };
101
102 static void sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
103 static void sleep_forever(rb_thread_t *th, unsigned int fl);
104 static void rb_thread_sleep_deadly_allow_spurious_wakeup(void);
105 static int rb_threadptr_dead(rb_thread_t *th);
106 static void rb_check_deadlock(rb_vm_t *vm);
107 static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
108 static const char *thread_status_name(rb_thread_t *th, int detail);
109 static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t);
110 NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
111 static int consume_communication_pipe(int fd);
112 static int check_signals_nogvl(rb_thread_t *, int sigwait_fd);
113 void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */
114
115 #define eKillSignal INT2FIX(0)
116 #define eTerminateSignal INT2FIX(1)
117 static volatile int system_working = 1;
118
119 struct waiting_fd {
120 struct list_node wfd_node; /* <=> vm.waiting_fds */
121 rb_thread_t *th;
122 int fd;
123 };
124
125 inline static void
st_delete_wrap(st_table * table,st_data_t key)126 st_delete_wrap(st_table *table, st_data_t key)
127 {
128 st_delete(table, &key, 0);
129 }
130
131 /********************************************************************************/
132
133 #define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION
134
135 struct rb_blocking_region_buffer {
136 enum rb_thread_status prev_status;
137 };
138
139 static int unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted);
140 static void unblock_function_clear(rb_thread_t *th);
141
142 static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
143 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted);
144 static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region);
145
146 #ifdef __ia64
147 #define RB_GC_SAVE_MACHINE_REGISTER_STACK(th) \
148 do{(th)->ec->machine.register_stack_end = rb_ia64_bsp();}while(0)
149 #else
150 #define RB_GC_SAVE_MACHINE_REGISTER_STACK(th)
151 #endif
152 #define RB_GC_SAVE_MACHINE_CONTEXT(th) \
153 do { \
154 FLUSH_REGISTER_WINDOWS; \
155 RB_GC_SAVE_MACHINE_REGISTER_STACK(th); \
156 setjmp((th)->ec->machine.regs); \
157 SET_MACHINE_STACK_END(&(th)->ec->machine.stack_end); \
158 } while (0)
159
160 #define GVL_UNLOCK_BEGIN(th) do { \
161 RB_GC_SAVE_MACHINE_CONTEXT(th); \
162 gvl_release(th->vm);
163
164 #define GVL_UNLOCK_END(th) \
165 gvl_acquire(th->vm, th); \
166 rb_thread_set_current(th); \
167 } while(0)
168
169 #ifdef __GNUC__
170 #ifdef HAVE_BUILTIN___BUILTIN_CHOOSE_EXPR_CONSTANT_P
171 #define only_if_constant(expr, notconst) __builtin_choose_expr(__builtin_constant_p(expr), (expr), (notconst))
172 #else
173 #define only_if_constant(expr, notconst) (__builtin_constant_p(expr) ? (expr) : (notconst))
174 #endif
175 #else
176 #define only_if_constant(expr, notconst) notconst
177 #endif
178 #define BLOCKING_REGION(th, exec, ubf, ubfarg, fail_if_interrupted) do { \
179 struct rb_blocking_region_buffer __region; \
180 if (blocking_region_begin(th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \
181 /* always return true unless fail_if_interrupted */ \
182 !only_if_constant(fail_if_interrupted, TRUE)) { \
183 exec; \
184 blocking_region_end(th, &__region); \
185 }; \
186 } while(0)
187
188 /*
189 * returns true if this thread was spuriously interrupted, false otherwise
190 * (e.g. hit by Thread#run or ran a Ruby-level Signal.trap handler)
191 */
192 #define RUBY_VM_CHECK_INTS_BLOCKING(ec) vm_check_ints_blocking(ec)
193 static inline int
vm_check_ints_blocking(rb_execution_context_t * ec)194 vm_check_ints_blocking(rb_execution_context_t *ec)
195 {
196 rb_thread_t *th = rb_ec_thread_ptr(ec);
197
198 if (LIKELY(rb_threadptr_pending_interrupt_empty_p(th))) {
199 if (LIKELY(!RUBY_VM_INTERRUPTED_ANY(ec))) return FALSE;
200 }
201 else {
202 th->pending_interrupt_queue_checked = 0;
203 RUBY_VM_SET_INTERRUPT(ec);
204 }
205 return rb_threadptr_execute_interrupts(th, 1);
206 }
207
208 static int
vm_living_thread_num(const rb_vm_t * vm)209 vm_living_thread_num(const rb_vm_t *vm)
210 {
211 return vm->living_thread_num;
212 }
213
214 /*
215 * poll() is supported by many OSes, but so far Linux is the only
216 * one we know of that supports using poll() in all places select()
217 * would work.
218 */
219 #if defined(HAVE_POLL)
220 # if defined(__linux__)
221 # define USE_POLL
222 # endif
223 # if defined(__FreeBSD_version) && __FreeBSD_version >= 1100000
224 # define USE_POLL
225 /* FreeBSD does not set POLLOUT when POLLHUP happens */
226 # define POLLERR_SET (POLLHUP | POLLERR)
227 # endif
228 #endif
229
230 static void
timeout_prepare(rb_hrtime_t ** to,rb_hrtime_t * rel,rb_hrtime_t * end,const struct timeval * timeout)231 timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end,
232 const struct timeval *timeout)
233 {
234 if (timeout) {
235 *rel = rb_timeval2hrtime(timeout);
236 *end = rb_hrtime_add(rb_hrtime_now(), *rel);
237 *to = rel;
238 }
239 else {
240 *to = 0;
241 }
242 }
243
244 #if THREAD_DEBUG
245 #ifdef HAVE_VA_ARGS_MACRO
246 void rb_thread_debug(const char *file, int line, const char *fmt, ...);
247 #define thread_debug(...) rb_thread_debug(__FILE__, __LINE__, __VA_ARGS__)
248 #define POSITION_FORMAT "%s:%d:"
249 #define POSITION_ARGS ,file, line
250 #else
251 void rb_thread_debug(const char *fmt, ...);
252 #define thread_debug rb_thread_debug
253 #define POSITION_FORMAT
254 #define POSITION_ARGS
255 #endif
256
257 # ifdef NON_SCALAR_THREAD_ID
258 #define fill_thread_id_string ruby_fill_thread_id_string
259 const char *
ruby_fill_thread_id_string(rb_nativethread_id_t thid,rb_thread_id_string_t buf)260 ruby_fill_thread_id_string(rb_nativethread_id_t thid, rb_thread_id_string_t buf)
261 {
262 extern const char ruby_digitmap[];
263 size_t i;
264
265 buf[0] = '0';
266 buf[1] = 'x';
267 for (i = 0; i < sizeof(thid); i++) {
268 # ifdef LITTLE_ENDIAN
269 size_t j = sizeof(thid) - i - 1;
270 # else
271 size_t j = i;
272 # endif
273 unsigned char c = (unsigned char)((char *)&thid)[j];
274 buf[2 + i * 2] = ruby_digitmap[(c >> 4) & 0xf];
275 buf[3 + i * 2] = ruby_digitmap[c & 0xf];
276 }
277 buf[sizeof(rb_thread_id_string_t)-1] = '\0';
278 return buf;
279 }
280 # define fill_thread_id_str(th) fill_thread_id_string((th)->thread_id, (th)->thread_id_string)
281 # define thread_id_str(th) ((th)->thread_id_string)
282 # define PRI_THREAD_ID "s"
283 # endif
284
285 # if THREAD_DEBUG < 0
286 static int rb_thread_debug_enabled;
287
288 /*
289 * call-seq:
290 * Thread.DEBUG -> num
291 *
292 * Returns the thread debug level. Available only if compiled with
293 * THREAD_DEBUG=-1.
294 */
295
296 static VALUE
rb_thread_s_debug(void)297 rb_thread_s_debug(void)
298 {
299 return INT2NUM(rb_thread_debug_enabled);
300 }
301
302 /*
303 * call-seq:
304 * Thread.DEBUG = num
305 *
306 * Sets the thread debug level. Available only if compiled with
307 * THREAD_DEBUG=-1.
308 */
309
310 static VALUE
rb_thread_s_debug_set(VALUE self,VALUE val)311 rb_thread_s_debug_set(VALUE self, VALUE val)
312 {
313 rb_thread_debug_enabled = RTEST(val) ? NUM2INT(val) : 0;
314 return val;
315 }
316 # else
317 # define rb_thread_debug_enabled THREAD_DEBUG
318 # endif
319 #else
320 #define thread_debug if(0)printf
321 #endif
322
323 #ifndef fill_thread_id_str
324 # define fill_thread_id_string(thid, buf) ((void *)(uintptr_t)(thid))
325 # define fill_thread_id_str(th) (void)0
326 # define thread_id_str(th) ((void *)(uintptr_t)(th)->thread_id)
327 # define PRI_THREAD_ID "p"
328 #endif
329
330 #ifndef __ia64
331 #define thread_start_func_2(th, st, rst) thread_start_func_2(th, st)
332 #endif
333 NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start,
334 VALUE *register_stack_start));
335 static void timer_thread_function(void);
336 void ruby_sigchld_handler(rb_vm_t *); /* signal.c */
337
338 static void
ubf_sigwait(void * ignore)339 ubf_sigwait(void *ignore)
340 {
341 rb_thread_wakeup_timer_thread(0);
342 }
343
344 #if defined(_WIN32)
345 #include "thread_win32.c"
346
347 #define DEBUG_OUT() \
348 WaitForSingleObject(&debug_mutex, INFINITE); \
349 printf(POSITION_FORMAT"%#lx - %s" POSITION_ARGS, GetCurrentThreadId(), buf); \
350 fflush(stdout); \
351 ReleaseMutex(&debug_mutex);
352
353 #elif defined(HAVE_PTHREAD_H)
354 #include "thread_pthread.c"
355
356 #define DEBUG_OUT() \
357 pthread_mutex_lock(&debug_mutex); \
358 printf(POSITION_FORMAT"%"PRI_THREAD_ID" - %s" POSITION_ARGS, \
359 fill_thread_id_string(pthread_self(), thread_id_string), buf); \
360 fflush(stdout); \
361 pthread_mutex_unlock(&debug_mutex);
362
363 #else
364 #error "unsupported thread type"
365 #endif
366
367 /*
368 * TODO: somebody with win32 knowledge should be able to get rid of
369 * timer-thread by busy-waiting on signals. And it should be possible
370 * to make the GVL in thread_pthread.c be platform-independent.
371 */
372 #ifndef BUSY_WAIT_SIGNALS
373 # define BUSY_WAIT_SIGNALS (0)
374 #endif
375
376 #ifndef USE_EVENTFD
377 # define USE_EVENTFD (0)
378 #endif
379
380 #if THREAD_DEBUG
381 static int debug_mutex_initialized = 1;
382 static rb_nativethread_lock_t debug_mutex;
383
384 void
rb_thread_debug(const char * file,int line,const char * fmt,...)385 rb_thread_debug(
386 #ifdef HAVE_VA_ARGS_MACRO
387 const char *file, int line,
388 #endif
389 const char *fmt, ...)
390 {
391 va_list args;
392 char buf[BUFSIZ];
393 #ifdef NON_SCALAR_THREAD_ID
394 rb_thread_id_string_t thread_id_string;
395 #endif
396
397 if (!rb_thread_debug_enabled) return;
398
399 if (debug_mutex_initialized == 1) {
400 debug_mutex_initialized = 0;
401 rb_native_mutex_initialize(&debug_mutex);
402 }
403
404 va_start(args, fmt);
405 vsnprintf(buf, BUFSIZ, fmt, args);
406 va_end(args);
407
408 DEBUG_OUT();
409 }
410 #endif
411
412 #include "thread_sync.c"
413
414 void
rb_vm_gvl_destroy(rb_vm_t * vm)415 rb_vm_gvl_destroy(rb_vm_t *vm)
416 {
417 gvl_release(vm);
418 gvl_destroy(vm);
419 if (0) {
420 /* may be held by running threads */
421 rb_native_mutex_destroy(&vm->waitpid_lock);
422 rb_native_mutex_destroy(&vm->workqueue_lock);
423 }
424 }
425
426 void
rb_nativethread_lock_initialize(rb_nativethread_lock_t * lock)427 rb_nativethread_lock_initialize(rb_nativethread_lock_t *lock)
428 {
429 rb_native_mutex_initialize(lock);
430 }
431
432 void
rb_nativethread_lock_destroy(rb_nativethread_lock_t * lock)433 rb_nativethread_lock_destroy(rb_nativethread_lock_t *lock)
434 {
435 rb_native_mutex_destroy(lock);
436 }
437
438 void
rb_nativethread_lock_lock(rb_nativethread_lock_t * lock)439 rb_nativethread_lock_lock(rb_nativethread_lock_t *lock)
440 {
441 rb_native_mutex_lock(lock);
442 }
443
444 void
rb_nativethread_lock_unlock(rb_nativethread_lock_t * lock)445 rb_nativethread_lock_unlock(rb_nativethread_lock_t *lock)
446 {
447 rb_native_mutex_unlock(lock);
448 }
449
450 static int
unblock_function_set(rb_thread_t * th,rb_unblock_function_t * func,void * arg,int fail_if_interrupted)451 unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted)
452 {
453 do {
454 if (fail_if_interrupted) {
455 if (RUBY_VM_INTERRUPTED_ANY(th->ec)) {
456 return FALSE;
457 }
458 }
459 else {
460 RUBY_VM_CHECK_INTS(th->ec);
461 }
462
463 rb_native_mutex_lock(&th->interrupt_lock);
464 } while (!th->ec->raised_flag && RUBY_VM_INTERRUPTED_ANY(th->ec) &&
465 (rb_native_mutex_unlock(&th->interrupt_lock), TRUE));
466
467 VM_ASSERT(th->unblock.func == NULL);
468
469 th->unblock.func = func;
470 th->unblock.arg = arg;
471 rb_native_mutex_unlock(&th->interrupt_lock);
472
473 return TRUE;
474 }
475
476 static void
unblock_function_clear(rb_thread_t * th)477 unblock_function_clear(rb_thread_t *th)
478 {
479 rb_native_mutex_lock(&th->interrupt_lock);
480 th->unblock.func = NULL;
481 rb_native_mutex_unlock(&th->interrupt_lock);
482 }
483
484 static void
rb_threadptr_interrupt_common(rb_thread_t * th,int trap)485 rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
486 {
487 rb_native_mutex_lock(&th->interrupt_lock);
488 if (trap) {
489 RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
490 }
491 else {
492 RUBY_VM_SET_INTERRUPT(th->ec);
493 }
494 if (th->unblock.func != NULL) {
495 (th->unblock.func)(th->unblock.arg);
496 }
497 else {
498 /* none */
499 }
500 rb_native_mutex_unlock(&th->interrupt_lock);
501 }
502
503 void
rb_threadptr_interrupt(rb_thread_t * th)504 rb_threadptr_interrupt(rb_thread_t *th)
505 {
506 rb_threadptr_interrupt_common(th, 0);
507 }
508
509 static void
threadptr_trap_interrupt(rb_thread_t * th)510 threadptr_trap_interrupt(rb_thread_t *th)
511 {
512 rb_threadptr_interrupt_common(th, 1);
513 }
514
515 static void
terminate_all(rb_vm_t * vm,const rb_thread_t * main_thread)516 terminate_all(rb_vm_t *vm, const rb_thread_t *main_thread)
517 {
518 rb_thread_t *th = 0;
519
520 list_for_each(&vm->living_threads, th, vmlt_node) {
521 if (th != main_thread) {
522 thread_debug("terminate_all: begin (thid: %"PRI_THREAD_ID", status: %s)\n",
523 thread_id_str(th), thread_status_name(th, TRUE));
524 rb_threadptr_pending_interrupt_enque(th, eTerminateSignal);
525 rb_threadptr_interrupt(th);
526 thread_debug("terminate_all: end (thid: %"PRI_THREAD_ID", status: %s)\n",
527 thread_id_str(th), thread_status_name(th, TRUE));
528 }
529 else {
530 thread_debug("terminate_all: main thread (%p)\n", (void *)th);
531 }
532 }
533 }
534
535 void
rb_threadptr_unlock_all_locking_mutexes(rb_thread_t * th)536 rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
537 {
538 const char *err;
539 rb_mutex_t *mutex;
540 rb_mutex_t *mutexes = th->keeping_mutexes;
541
542 while (mutexes) {
543 mutex = mutexes;
544 /* rb_warn("mutex #<%p> remains to be locked by terminated thread",
545 (void *)mutexes); */
546 mutexes = mutex->next_mutex;
547 err = rb_mutex_unlock_th(mutex, th);
548 if (err) rb_bug("invalid keeping_mutexes: %s", err);
549 }
550 }
551
552 void
rb_thread_terminate_all(void)553 rb_thread_terminate_all(void)
554 {
555 rb_thread_t *volatile th = GET_THREAD(); /* main thread */
556 rb_execution_context_t * volatile ec = th->ec;
557 rb_vm_t *volatile vm = th->vm;
558 volatile int sleeping = 0;
559
560 if (vm->main_thread != th) {
561 rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)",
562 (void *)vm->main_thread, (void *)th);
563 }
564
565 /* unlock all locking mutexes */
566 rb_threadptr_unlock_all_locking_mutexes(th);
567
568 EC_PUSH_TAG(ec);
569 if (EC_EXEC_TAG() == TAG_NONE) {
570 retry:
571 thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th);
572 terminate_all(vm, th);
573
574 while (vm_living_thread_num(vm) > 1) {
575 rb_hrtime_t rel = RB_HRTIME_PER_SEC;
576 /*
577 * Thread exiting routine in thread_start_func_2 notify
578 * me when the last sub-thread exit.
579 */
580 sleeping = 1;
581 native_sleep(th, &rel);
582 RUBY_VM_CHECK_INTS_BLOCKING(ec);
583 sleeping = 0;
584 }
585 }
586 else {
587 /*
588 * When caught an exception (e.g. Ctrl+C), let's broadcast
589 * kill request again to ensure killing all threads even
590 * if they are blocked on sleep, mutex, etc.
591 */
592 if (sleeping) {
593 sleeping = 0;
594 goto retry;
595 }
596 }
597 EC_POP_TAG();
598 }
599
600 static void
thread_cleanup_func_before_exec(void * th_ptr)601 thread_cleanup_func_before_exec(void *th_ptr)
602 {
603 rb_thread_t *th = th_ptr;
604 th->status = THREAD_KILLED;
605 th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;
606 #ifdef __ia64
607 th->ec->machine.register_stack_start = th->ec->machine.register_stack_end = NULL;
608 #endif
609 }
610
611 static void
thread_cleanup_func(void * th_ptr,int atfork)612 thread_cleanup_func(void *th_ptr, int atfork)
613 {
614 rb_thread_t *th = th_ptr;
615
616 th->locking_mutex = Qfalse;
617 thread_cleanup_func_before_exec(th_ptr);
618
619 /*
620 * Unfortunately, we can't release native threading resource at fork
621 * because libc may have unstable locking state therefore touching
622 * a threading resource may cause a deadlock.
623 *
624 * FIXME: Skipping native_mutex_destroy(pthread_mutex_destroy) is safe
625 * with NPTL, but native_thread_destroy calls pthread_cond_destroy
626 * which calls free(3), so there is a small memory leak atfork, here.
627 */
628 if (atfork)
629 return;
630
631 rb_native_mutex_destroy(&th->interrupt_lock);
632 native_thread_destroy(th);
633 }
634
635 static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
636 static VALUE rb_thread_to_s(VALUE thread);
637
638 void
ruby_thread_init_stack(rb_thread_t * th)639 ruby_thread_init_stack(rb_thread_t *th)
640 {
641 native_thread_init_stack(th);
642 }
643
644 const VALUE *
rb_vm_proc_local_ep(VALUE proc)645 rb_vm_proc_local_ep(VALUE proc)
646 {
647 const VALUE *ep = vm_proc_ep(proc);
648
649 if (ep) {
650 return rb_vm_ep_local_ep(ep);
651 }
652 else {
653 return NULL;
654 }
655 }
656
657 static void
thread_do_start(rb_thread_t * th)658 thread_do_start(rb_thread_t *th)
659 {
660 native_set_thread_name(th);
661
662 if (th->invoke_type == thread_invoke_type_proc) {
663 VALUE args = th->invoke_arg.proc.args;
664 long args_len = RARRAY_LEN(args);
665 const VALUE *args_ptr;
666 VALUE procval = th->invoke_arg.proc.proc;
667 rb_proc_t *proc;
668 GetProcPtr(procval, proc);
669
670 th->ec->errinfo = Qnil;
671 th->ec->root_lep = rb_vm_proc_local_ep(procval);
672 th->ec->root_svar = Qfalse;
673
674 EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
675
676 if (args_len < 8) {
677 /* free proc.args if the length is enough small */
678 args_ptr = ALLOCA_N(VALUE, args_len);
679 MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR_TRANSIENT(args), VALUE, args_len);
680 th->invoke_arg.proc.args = Qnil;
681 }
682 else {
683 args_ptr = RARRAY_CONST_PTR(args);
684 }
685
686 th->value = rb_vm_invoke_proc(th->ec, proc,
687 (int)args_len, args_ptr,
688 VM_BLOCK_HANDLER_NONE);
689
690 EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
691 }
692 else {
693 th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
694 }
695 }
696
697 void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
698
699 static int
thread_start_func_2(rb_thread_t * th,VALUE * stack_start,VALUE * register_stack_start)700 thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start)
701 {
702 enum ruby_tag_type state;
703 rb_thread_list_t *join_list;
704 rb_thread_t *main_th;
705 VALUE errinfo = Qnil;
706
707 if (th == th->vm->main_thread)
708 rb_bug("thread_start_func_2 must not be used for main thread");
709
710 ruby_thread_set_native(th);
711
712 th->ec->machine.stack_start = stack_start;
713 #ifdef __ia64
714 th->ec->machine.register_stack_start = register_stack_start;
715 #endif
716 thread_debug("thread start: %p\n", (void *)th);
717
718 gvl_acquire(th->vm, th);
719 {
720 thread_debug("thread start (get lock): %p\n", (void *)th);
721 rb_thread_set_current(th);
722
723 EC_PUSH_TAG(th->ec);
724 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
725 SAVE_ROOT_JMPBUF(th, thread_do_start(th));
726 }
727 else {
728 errinfo = th->ec->errinfo;
729 if (state == TAG_FATAL) {
730 /* fatal error within this thread, need to stop whole script */
731 }
732 else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
733 /* exit on main_thread. */
734 }
735 else {
736 if (th->report_on_exception) {
737 VALUE mesg = rb_thread_to_s(th->self);
738 rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n");
739 rb_write_error_str(mesg);
740 rb_ec_error_print(th->ec, errinfo);
741 }
742 if (th->vm->thread_abort_on_exception ||
743 th->abort_on_exception || RTEST(ruby_debug)) {
744 /* exit on main_thread */
745 }
746 else {
747 errinfo = Qnil;
748 }
749 }
750 th->value = Qnil;
751 }
752
753 th->status = THREAD_KILLED;
754 thread_debug("thread end: %p\n", (void *)th);
755
756 main_th = th->vm->main_thread;
757 if (main_th == th) {
758 ruby_stop(0);
759 }
760 if (RB_TYPE_P(errinfo, T_OBJECT)) {
761 /* treat with normal error object */
762 rb_threadptr_raise(main_th, 1, &errinfo);
763 }
764 EC_POP_TAG();
765
766 rb_ec_clear_current_thread_trace_func(th->ec);
767
768 /* locking_mutex must be Qfalse */
769 if (th->locking_mutex != Qfalse) {
770 rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
771 (void *)th, th->locking_mutex);
772 }
773
774 /* delete self other than main thread from living_threads */
775 rb_vm_living_threads_remove(th->vm, th);
776 if (main_th->status == THREAD_KILLED && rb_thread_alone()) {
777 /* I'm last thread. wake up main thread from rb_thread_terminate_all */
778 rb_threadptr_interrupt(main_th);
779 }
780
781 /* wake up joining threads */
782 join_list = th->join_list;
783 while (join_list) {
784 rb_threadptr_interrupt(join_list->th);
785 switch (join_list->th->status) {
786 case THREAD_STOPPED: case THREAD_STOPPED_FOREVER:
787 join_list->th->status = THREAD_RUNNABLE;
788 default: break;
789 }
790 join_list = join_list->next;
791 }
792
793 rb_threadptr_unlock_all_locking_mutexes(th);
794 rb_check_deadlock(th->vm);
795
796 rb_fiber_close(th->ec->fiber_ptr);
797 }
798 thread_cleanup_func(th, FALSE);
799 gvl_release(th->vm);
800
801 return 0;
802 }
803
804 static VALUE
thread_create_core(VALUE thval,VALUE args,VALUE (* fn)(ANYARGS))805 thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
806 {
807 rb_thread_t *th = rb_thread_ptr(thval), *current_th = GET_THREAD();
808 int err;
809
810 if (OBJ_FROZEN(current_th->thgroup)) {
811 rb_raise(rb_eThreadError,
812 "can't start a new thread (frozen ThreadGroup)");
813 }
814
815 if (fn) {
816 th->invoke_type = thread_invoke_type_func;
817 th->invoke_arg.func.func = fn;
818 th->invoke_arg.func.arg = (void *)args;
819 }
820 else {
821 th->invoke_type = thread_invoke_type_proc;
822 th->invoke_arg.proc.proc = rb_block_proc();
823 th->invoke_arg.proc.args = args;
824 }
825
826 th->priority = current_th->priority;
827 th->thgroup = current_th->thgroup;
828
829 th->pending_interrupt_queue = rb_ary_tmp_new(0);
830 th->pending_interrupt_queue_checked = 0;
831 th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack);
832 RBASIC_CLEAR_CLASS(th->pending_interrupt_mask_stack);
833
834 rb_native_mutex_initialize(&th->interrupt_lock);
835
836 /* kick thread */
837 err = native_thread_create(th);
838 if (err) {
839 th->status = THREAD_KILLED;
840 rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err));
841 }
842 rb_vm_living_threads_insert(th->vm, th);
843 return thval;
844 }
845
846 #define threadptr_initialized(th) ((th)->invoke_type != thread_invoke_type_none)
847
848 /*
849 * call-seq:
850 * Thread.new { ... } -> thread
851 * Thread.new(*args, &proc) -> thread
852 * Thread.new(*args) { |args| ... } -> thread
853 *
854 * Creates a new thread executing the given block.
855 *
856 * Any +args+ given to ::new will be passed to the block:
857 *
858 * arr = []
859 * a, b, c = 1, 2, 3
860 * Thread.new(a,b,c) { |d,e,f| arr << d << e << f }.join
861 * arr #=> [1, 2, 3]
862 *
863 * A ThreadError exception is raised if ::new is called without a block.
864 *
865 * If you're going to subclass Thread, be sure to call super in your
866 * +initialize+ method, otherwise a ThreadError will be raised.
867 */
868 static VALUE
thread_s_new(int argc,VALUE * argv,VALUE klass)869 thread_s_new(int argc, VALUE *argv, VALUE klass)
870 {
871 rb_thread_t *th;
872 VALUE thread = rb_thread_alloc(klass);
873
874 if (GET_VM()->main_thread->status == THREAD_KILLED)
875 rb_raise(rb_eThreadError, "can't alloc thread");
876
877 rb_obj_call_init(thread, argc, argv);
878 th = rb_thread_ptr(thread);
879 if (!threadptr_initialized(th)) {
880 rb_raise(rb_eThreadError, "uninitialized thread - check `%"PRIsVALUE"#initialize'",
881 klass);
882 }
883 return thread;
884 }
885
886 /*
887 * call-seq:
888 * Thread.start([args]*) {|args| block } -> thread
889 * Thread.fork([args]*) {|args| block } -> thread
890 *
891 * Basically the same as ::new. However, if class Thread is subclassed, then
892 * calling +start+ in that subclass will not invoke the subclass's
893 * +initialize+ method.
894 */
895
896 static VALUE
thread_start(VALUE klass,VALUE args)897 thread_start(VALUE klass, VALUE args)
898 {
899 return thread_create_core(rb_thread_alloc(klass), args, 0);
900 }
901
902 static VALUE
threadptr_invoke_proc_location(rb_thread_t * th)903 threadptr_invoke_proc_location(rb_thread_t *th)
904 {
905 if (th->invoke_type == thread_invoke_type_proc) {
906 return rb_proc_location(th->invoke_arg.proc.proc);
907 }
908 else {
909 return Qnil;
910 }
911 }
912
913 /* :nodoc: */
914 static VALUE
thread_initialize(VALUE thread,VALUE args)915 thread_initialize(VALUE thread, VALUE args)
916 {
917 rb_thread_t *th = rb_thread_ptr(thread);
918
919 if (!rb_block_given_p()) {
920 rb_raise(rb_eThreadError, "must be called with a block");
921 }
922 else if (th->invoke_type != thread_invoke_type_none) {
923 VALUE loc = threadptr_invoke_proc_location(th);
924 if (!NIL_P(loc)) {
925 rb_raise(rb_eThreadError,
926 "already initialized thread - %"PRIsVALUE":%"PRIsVALUE,
927 RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
928 }
929 else {
930 rb_raise(rb_eThreadError, "already initialized thread");
931 }
932 }
933 else {
934 return thread_create_core(thread, args, NULL);
935 }
936 }
937
938 VALUE
rb_thread_create(VALUE (* fn)(ANYARGS),void * arg)939 rb_thread_create(VALUE (*fn)(ANYARGS), void *arg)
940 {
941 return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn);
942 }
943
944
945 struct join_arg {
946 rb_thread_t *target, *waiting;
947 rb_hrtime_t *limit;
948 };
949
950 static VALUE
remove_from_join_list(VALUE arg)951 remove_from_join_list(VALUE arg)
952 {
953 struct join_arg *p = (struct join_arg *)arg;
954 rb_thread_t *target_th = p->target, *th = p->waiting;
955
956 if (target_th->status != THREAD_KILLED) {
957 rb_thread_list_t **p = &target_th->join_list;
958
959 while (*p) {
960 if ((*p)->th == th) {
961 *p = (*p)->next;
962 break;
963 }
964 p = &(*p)->next;
965 }
966 }
967
968 return Qnil;
969 }
970
971 static VALUE
thread_join_sleep(VALUE arg)972 thread_join_sleep(VALUE arg)
973 {
974 struct join_arg *p = (struct join_arg *)arg;
975 rb_thread_t *target_th = p->target, *th = p->waiting;
976 rb_hrtime_t end = 0;
977
978 if (p->limit) {
979 end = rb_hrtime_add(*p->limit, rb_hrtime_now());
980 }
981
982 while (target_th->status != THREAD_KILLED) {
983 if (!p->limit) {
984 th->status = THREAD_STOPPED_FOREVER;
985 th->vm->sleeper++;
986 rb_check_deadlock(th->vm);
987 native_sleep(th, 0);
988 th->vm->sleeper--;
989 }
990 else {
991 if (hrtime_update_expire(p->limit, end)) {
992 thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n",
993 thread_id_str(target_th));
994 return Qfalse;
995 }
996 th->status = THREAD_STOPPED;
997 native_sleep(th, p->limit);
998 }
999 RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
1000 th->status = THREAD_RUNNABLE;
1001 thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n",
1002 thread_id_str(target_th), thread_status_name(target_th, TRUE));
1003 }
1004 return Qtrue;
1005 }
1006
1007 static VALUE
thread_join(rb_thread_t * target_th,rb_hrtime_t * rel)1008 thread_join(rb_thread_t *target_th, rb_hrtime_t *rel)
1009 {
1010 rb_thread_t *th = GET_THREAD();
1011 struct join_arg arg;
1012
1013 if (th == target_th) {
1014 rb_raise(rb_eThreadError, "Target thread must not be current thread");
1015 }
1016 if (GET_VM()->main_thread == target_th) {
1017 rb_raise(rb_eThreadError, "Target thread must not be main thread");
1018 }
1019
1020 arg.target = target_th;
1021 arg.waiting = th;
1022 arg.limit = rel;
1023
1024 thread_debug("thread_join (thid: %"PRI_THREAD_ID", status: %s)\n",
1025 thread_id_str(target_th), thread_status_name(target_th, TRUE));
1026
1027 if (target_th->status != THREAD_KILLED) {
1028 rb_thread_list_t list;
1029 list.next = target_th->join_list;
1030 list.th = th;
1031 target_th->join_list = &list;
1032 if (!rb_ensure(thread_join_sleep, (VALUE)&arg,
1033 remove_from_join_list, (VALUE)&arg)) {
1034 return Qnil;
1035 }
1036 }
1037
1038 thread_debug("thread_join: success (thid: %"PRI_THREAD_ID", status: %s)\n",
1039 thread_id_str(target_th), thread_status_name(target_th, TRUE));
1040
1041 if (target_th->ec->errinfo != Qnil) {
1042 VALUE err = target_th->ec->errinfo;
1043
1044 if (FIXNUM_P(err)) {
1045 switch (err) {
1046 case INT2FIX(TAG_FATAL):
1047 thread_debug("thread_join: terminated (thid: %"PRI_THREAD_ID", status: %s)\n",
1048 thread_id_str(target_th), thread_status_name(target_th, TRUE));
1049
1050 /* OK. killed. */
1051 break;
1052 default:
1053 rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err));
1054 }
1055 }
1056 else if (THROW_DATA_P(target_th->ec->errinfo)) {
1057 rb_bug("thread_join: THROW_DATA should not reach here.");
1058 }
1059 else {
1060 /* normal exception */
1061 rb_exc_raise(err);
1062 }
1063 }
1064 return target_th->self;
1065 }
1066
1067 static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
1068
1069 /*
1070 * call-seq:
1071 * thr.join -> thr
1072 * thr.join(limit) -> thr
1073 *
1074 * The calling thread will suspend execution and run this +thr+.
1075 *
1076 * Does not return until +thr+ exits or until the given +limit+ seconds have
1077 * passed.
1078 *
1079 * If the time limit expires, +nil+ will be returned, otherwise +thr+ is
1080 * returned.
1081 *
1082 * Any threads not joined will be killed when the main program exits.
1083 *
1084 * If +thr+ had previously raised an exception and the ::abort_on_exception or
1085 * $DEBUG flags are not set, (so the exception has not yet been processed), it
1086 * will be processed at this time.
1087 *
1088 * a = Thread.new { print "a"; sleep(10); print "b"; print "c" }
1089 * x = Thread.new { print "x"; Thread.pass; print "y"; print "z" }
1090 * x.join # Let thread x finish, thread a will be killed on exit.
1091 * #=> "axyz"
1092 *
1093 * The following example illustrates the +limit+ parameter.
1094 *
1095 * y = Thread.new { 4.times { sleep 0.1; puts 'tick... ' }}
1096 * puts "Waiting" until y.join(0.15)
1097 *
1098 * This will produce:
1099 *
1100 * tick...
1101 * Waiting
1102 * tick...
1103 * Waiting
1104 * tick...
1105 * tick...
1106 */
1107
1108 static VALUE
thread_join_m(int argc,VALUE * argv,VALUE self)1109 thread_join_m(int argc, VALUE *argv, VALUE self)
1110 {
1111 VALUE limit;
1112 rb_hrtime_t rel, *to = 0;
1113
1114 /*
1115 * This supports INFINITY and negative values, so we can't use
1116 * rb_time_interval right now...
1117 */
1118 if (!rb_check_arity(argc, 0, 1) || NIL_P(argv[0])) {
1119 /* unlimited */
1120 }
1121 else if (FIXNUM_P(limit = argv[0])) {
1122 rel = rb_sec2hrtime(NUM2TIMET(limit));
1123 to = &rel;
1124 }
1125 else {
1126 to = double2hrtime(&rel, rb_num2dbl(limit));
1127 }
1128
1129 return thread_join(rb_thread_ptr(self), to);
1130 }
1131
1132 /*
1133 * call-seq:
1134 * thr.value -> obj
1135 *
1136 * Waits for +thr+ to complete, using #join, and returns its value or raises
1137 * the exception which terminated the thread.
1138 *
1139 * a = Thread.new { 2 + 2 }
1140 * a.value #=> 4
1141 *
1142 * b = Thread.new { raise 'something went wrong' }
1143 * b.value #=> RuntimeError: something went wrong
1144 */
1145
1146 static VALUE
thread_value(VALUE self)1147 thread_value(VALUE self)
1148 {
1149 rb_thread_t *th = rb_thread_ptr(self);
1150 thread_join(th, 0);
1151 return th->value;
1152 }
1153
1154 /*
1155 * Thread Scheduling
1156 */
1157
1158 /*
1159 * Back when we used "struct timeval", not all platforms implemented
1160 * tv_sec as time_t. Nowadays we use "struct timespec" and tv_sec
1161 * seems to be implemented more consistently across platforms.
1162 * At least other parts of our code hasn't had to deal with non-time_t
1163 * tv_sec in timespec...
1164 */
1165 #define TIMESPEC_SEC_MAX TIMET_MAX
1166 #define TIMESPEC_SEC_MIN TIMET_MIN
1167
1168 static rb_hrtime_t *
double2hrtime(rb_hrtime_t * hrt,double d)1169 double2hrtime(rb_hrtime_t *hrt, double d)
1170 {
1171 /* assume timespec.tv_sec has same signedness as time_t */
1172 const double TIMESPEC_SEC_MAX_PLUS_ONE = TIMET_MAX_PLUS_ONE;
1173
1174 if (TIMESPEC_SEC_MAX_PLUS_ONE <= d) {
1175 return NULL;
1176 }
1177 else if (d <= 0) {
1178 *hrt = 0;
1179 }
1180 else {
1181 *hrt = (rb_hrtime_t)(d * (double)RB_HRTIME_PER_SEC);
1182 }
1183 return hrt;
1184 }
1185
1186 static void
getclockofday(struct timespec * ts)1187 getclockofday(struct timespec *ts)
1188 {
1189 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
1190 if (clock_gettime(CLOCK_MONOTONIC, ts) == 0)
1191 return;
1192 #endif
1193 rb_timespec_now(ts);
1194 }
1195
1196 /*
1197 * Don't inline this, since library call is already time consuming
1198 * and we don't want "struct timespec" on stack too long for GC
1199 */
1200 NOINLINE(rb_hrtime_t rb_hrtime_now(void));
1201 rb_hrtime_t
rb_hrtime_now(void)1202 rb_hrtime_now(void)
1203 {
1204 struct timespec ts;
1205
1206 getclockofday(&ts);
1207 return rb_timespec2hrtime(&ts);
1208 }
1209
1210 static void
sleep_forever(rb_thread_t * th,unsigned int fl)1211 sleep_forever(rb_thread_t *th, unsigned int fl)
1212 {
1213 enum rb_thread_status prev_status = th->status;
1214 enum rb_thread_status status;
1215 int woke;
1216
1217 status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
1218 th->status = status;
1219 RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
1220 while (th->status == status) {
1221 if (fl & SLEEP_DEADLOCKABLE) {
1222 th->vm->sleeper++;
1223 rb_check_deadlock(th->vm);
1224 }
1225 native_sleep(th, 0);
1226 if (fl & SLEEP_DEADLOCKABLE) {
1227 th->vm->sleeper--;
1228 }
1229 woke = vm_check_ints_blocking(th->ec);
1230 if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
1231 break;
1232 }
1233 th->status = prev_status;
1234 }
1235
1236 /*
1237 * at least gcc 7.2 and 7.3 complains about "rb_hrtime_t end"
1238 * being uninitialized, maybe other versions, too.
1239 */
1240 COMPILER_WARNING_PUSH
1241 #if defined(__GNUC__) && __GNUC__ == 7 && __GNUC_MINOR__ <= 3
1242 COMPILER_WARNING_IGNORED(-Wmaybe-uninitialized)
1243 #endif
1244 #ifndef PRIu64
1245 #define PRIu64 PRI_64_PREFIX "u"
1246 #endif
1247 /*
1248 * @end is the absolute time when @ts is set to expire
1249 * Returns true if @end has past
1250 * Updates @ts and returns false otherwise
1251 */
1252 static int
hrtime_update_expire(rb_hrtime_t * timeout,const rb_hrtime_t end)1253 hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end)
1254 {
1255 rb_hrtime_t now = rb_hrtime_now();
1256
1257 if (now > end) return 1;
1258 thread_debug("hrtime_update_expire: "
1259 "%"PRIu64" > %"PRIu64"\n",
1260 (uint64_t)end, (uint64_t)now);
1261 *timeout = end - now;
1262 return 0;
1263 }
1264 COMPILER_WARNING_POP
1265
1266 static void
sleep_hrtime(rb_thread_t * th,rb_hrtime_t rel,unsigned int fl)1267 sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl)
1268 {
1269 enum rb_thread_status prev_status = th->status;
1270 int woke;
1271 rb_hrtime_t end = rb_hrtime_add(rb_hrtime_now(), rel);
1272
1273 th->status = THREAD_STOPPED;
1274 RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
1275 while (th->status == THREAD_STOPPED) {
1276 native_sleep(th, &rel);
1277 woke = vm_check_ints_blocking(th->ec);
1278 if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
1279 break;
1280 if (hrtime_update_expire(&rel, end))
1281 break;
1282 }
1283 th->status = prev_status;
1284 }
1285
1286 void
rb_thread_sleep_forever(void)1287 rb_thread_sleep_forever(void)
1288 {
1289 thread_debug("rb_thread_sleep_forever\n");
1290 sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK);
1291 }
1292
1293 void
rb_thread_sleep_deadly(void)1294 rb_thread_sleep_deadly(void)
1295 {
1296 thread_debug("rb_thread_sleep_deadly\n");
1297 sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK);
1298 }
1299
1300 void
rb_thread_sleep_interruptible(void)1301 rb_thread_sleep_interruptible(void)
1302 {
1303 rb_thread_t *th = GET_THREAD();
1304 enum rb_thread_status prev_status = th->status;
1305
1306 th->status = THREAD_STOPPED;
1307 native_sleep(th, 0);
1308 RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
1309 th->status = prev_status;
1310 }
1311
1312 static void
rb_thread_sleep_deadly_allow_spurious_wakeup(void)1313 rb_thread_sleep_deadly_allow_spurious_wakeup(void)
1314 {
1315 thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
1316 sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
1317 }
1318
1319 void
rb_thread_wait_for(struct timeval time)1320 rb_thread_wait_for(struct timeval time)
1321 {
1322 rb_thread_t *th = GET_THREAD();
1323
1324 sleep_hrtime(th, rb_timeval2hrtime(&time), SLEEP_SPURIOUS_CHECK);
1325 }
1326
1327 /*
1328 * CAUTION: This function causes thread switching.
1329 * rb_thread_check_ints() check ruby's interrupts.
1330 * some interrupt needs thread switching/invoke handlers,
1331 * and so on.
1332 */
1333
1334 void
rb_thread_check_ints(void)1335 rb_thread_check_ints(void)
1336 {
1337 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
1338 }
1339
1340 /*
1341 * Hidden API for tcl/tk wrapper.
1342 * There is no guarantee to perpetuate it.
1343 */
1344 int
rb_thread_check_trap_pending(void)1345 rb_thread_check_trap_pending(void)
1346 {
1347 return rb_signal_buff_size() != 0;
1348 }
1349
1350 /* This function can be called in blocking region. */
1351 int
rb_thread_interrupted(VALUE thval)1352 rb_thread_interrupted(VALUE thval)
1353 {
1354 return (int)RUBY_VM_INTERRUPTED(rb_thread_ptr(thval)->ec);
1355 }
1356
1357 void
rb_thread_sleep(int sec)1358 rb_thread_sleep(int sec)
1359 {
1360 rb_thread_wait_for(rb_time_timeval(INT2FIX(sec)));
1361 }
1362
1363 static void
rb_thread_schedule_limits(uint32_t limits_us)1364 rb_thread_schedule_limits(uint32_t limits_us)
1365 {
1366 thread_debug("rb_thread_schedule\n");
1367 if (!rb_thread_alone()) {
1368 rb_thread_t *th = GET_THREAD();
1369
1370 if (th->running_time_us >= limits_us) {
1371 thread_debug("rb_thread_schedule/switch start\n");
1372 RB_GC_SAVE_MACHINE_CONTEXT(th);
1373 gvl_yield(th->vm, th);
1374 rb_thread_set_current(th);
1375 thread_debug("rb_thread_schedule/switch done\n");
1376 }
1377 }
1378 }
1379
1380 void
rb_thread_schedule(void)1381 rb_thread_schedule(void)
1382 {
1383 rb_thread_schedule_limits(0);
1384 RUBY_VM_CHECK_INTS(GET_EC());
1385 }
1386
1387 /* blocking region */
1388
1389 static inline int
blocking_region_begin(rb_thread_t * th,struct rb_blocking_region_buffer * region,rb_unblock_function_t * ubf,void * arg,int fail_if_interrupted)1390 blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
1391 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted)
1392 {
1393 region->prev_status = th->status;
1394 if (unblock_function_set(th, ubf, arg, fail_if_interrupted)) {
1395 th->blocking_region_buffer = region;
1396 th->status = THREAD_STOPPED;
1397 thread_debug("enter blocking region (%p)\n", (void *)th);
1398 RB_GC_SAVE_MACHINE_CONTEXT(th);
1399 gvl_release(th->vm);
1400 return TRUE;
1401 }
1402 else {
1403 return FALSE;
1404 }
1405 }
1406
1407 static inline void
blocking_region_end(rb_thread_t * th,struct rb_blocking_region_buffer * region)1408 blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
1409 {
1410 /* entry to ubf_list still permitted at this point, make it impossible: */
1411 unblock_function_clear(th);
1412 /* entry to ubf_list impossible at this point, so unregister is safe: */
1413 unregister_ubf_list(th);
1414
1415 gvl_acquire(th->vm, th);
1416 rb_thread_set_current(th);
1417 thread_debug("leave blocking region (%p)\n", (void *)th);
1418 th->blocking_region_buffer = 0;
1419 if (th->status == THREAD_STOPPED) {
1420 th->status = region->prev_status;
1421 }
1422 }
1423
1424 static void *
call_without_gvl(void * (* func)(void *),void * data1,rb_unblock_function_t * ubf,void * data2,int fail_if_interrupted)1425 call_without_gvl(void *(*func)(void *), void *data1,
1426 rb_unblock_function_t *ubf, void *data2, int fail_if_interrupted)
1427 {
1428 void *val = 0;
1429 rb_execution_context_t *ec = GET_EC();
1430 rb_thread_t *th = rb_ec_thread_ptr(ec);
1431 int saved_errno = 0;
1432 VALUE ubf_th = Qfalse;
1433
1434 if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
1435 ubf = ubf_select;
1436 data2 = th;
1437 }
1438 else if (ubf && vm_living_thread_num(th->vm) == 1) {
1439 ubf_th = rb_thread_start_unblock_thread();
1440 }
1441
1442 BLOCKING_REGION(th, {
1443 val = func(data1);
1444 saved_errno = errno;
1445 }, ubf, data2, fail_if_interrupted);
1446
1447 if (!fail_if_interrupted) {
1448 RUBY_VM_CHECK_INTS_BLOCKING(ec);
1449 }
1450
1451 if (ubf_th != Qfalse) {
1452 thread_value(rb_thread_kill(ubf_th));
1453 }
1454
1455 errno = saved_errno;
1456
1457 return val;
1458 }
1459
1460 /*
1461 * rb_thread_call_without_gvl - permit concurrent/parallel execution.
1462 * rb_thread_call_without_gvl2 - permit concurrent/parallel execution
1463 * without interrupt process.
1464 *
1465 * rb_thread_call_without_gvl() does:
1466 * (1) Check interrupts.
1467 * (2) release GVL.
1468 * Other Ruby threads may run in parallel.
1469 * (3) call func with data1
1470 * (4) acquire GVL.
1471 * Other Ruby threads can not run in parallel any more.
1472 * (5) Check interrupts.
1473 *
1474 * rb_thread_call_without_gvl2() does:
1475 * (1) Check interrupt and return if interrupted.
1476 * (2) release GVL.
1477 * (3) call func with data1 and a pointer to the flags.
1478 * (4) acquire GVL.
1479 *
1480 * If another thread interrupts this thread (Thread#kill, signal delivery,
1481 * VM-shutdown request, and so on), `ubf()' is called (`ubf()' means
1482 * "un-blocking function"). `ubf()' should interrupt `func()' execution by
1483 * toggling a cancellation flag, canceling the invocation of a call inside
1484 * `func()' or similar. Note that `ubf()' may not be called with the GVL.
1485 *
1486 * There are built-in ubfs and you can specify these ubfs:
1487 *
1488 * * RUBY_UBF_IO: ubf for IO operation
1489 * * RUBY_UBF_PROCESS: ubf for process operation
1490 *
1491 * However, we can not guarantee our built-in ubfs interrupt your `func()'
1492 * correctly. Be careful to use rb_thread_call_without_gvl(). If you don't
1493 * provide proper ubf(), your program will not stop for Control+C or other
1494 * shutdown events.
1495 *
1496 * "Check interrupts" on above list means checking asynchronous
1497 * interrupt events (such as Thread#kill, signal delivery, VM-shutdown
1498 * request, and so on) and calling corresponding procedures
1499 * (such as `trap' for signals, raise an exception for Thread#raise).
1500 * If `func()' finished and received interrupts, you may skip interrupt
1501 * checking. For example, assume the following func() it reads data from file.
1502 *
1503 * read_func(...) {
1504 * // (a) before read
1505 * read(buffer); // (b) reading
1506 * // (c) after read
1507 * }
1508 *
1509 * If an interrupt occurs at (a) or (b), then `ubf()' cancels this
1510 * `read_func()' and interrupts are checked. However, if an interrupt occurs
1511 * at (c), after *read* operation is completed, checking interrupts is harmful
1512 * because it causes irrevocable side-effect, the read data will vanish. To
1513 * avoid such problem, the `read_func()' should be used with
1514 * `rb_thread_call_without_gvl2()'.
1515 *
1516 * If `rb_thread_call_without_gvl2()' detects interrupt, it returns
1517 * immediately. This function does not show when the execution was interrupted.
1518 * For example, there are 4 possible timing (a), (b), (c) and before calling
1519 * read_func(). You need to record progress of a read_func() and check
1520 * the progress after `rb_thread_call_without_gvl2()'. You may need to call
1521 * `rb_thread_check_ints()' correctly or your program can not process proper
1522 * process such as `trap' and so on.
1523 *
1524 * NOTE: You can not execute most of Ruby C API and touch Ruby
1525 * objects in `func()' and `ubf()', including raising an
1526 * exception, because current thread doesn't acquire GVL
1527 * (it causes synchronization problems). If you need to
1528 * call ruby functions either use rb_thread_call_with_gvl()
1529 * or read source code of C APIs and confirm safety by
1530 * yourself.
1531 *
1532 * NOTE: In short, this API is difficult to use safely. I recommend you
1533 * use other ways if you have. We lack experiences to use this API.
1534 * Please report your problem related on it.
1535 *
1536 * NOTE: Releasing GVL and re-acquiring GVL may be expensive operations
1537 * for a short running `func()'. Be sure to benchmark and use this
1538 * mechanism when `func()' consumes enough time.
1539 *
1540 * Safe C API:
1541 * * rb_thread_interrupted() - check interrupt flag
1542 * * ruby_xmalloc(), ruby_xrealloc(), ruby_xfree() -
1543 * they will work without GVL, and may acquire GVL when GC is needed.
1544 */
1545 void *
rb_thread_call_without_gvl2(void * (* func)(void *),void * data1,rb_unblock_function_t * ubf,void * data2)1546 rb_thread_call_without_gvl2(void *(*func)(void *), void *data1,
1547 rb_unblock_function_t *ubf, void *data2)
1548 {
1549 return call_without_gvl(func, data1, ubf, data2, TRUE);
1550 }
1551
1552 void *
rb_thread_call_without_gvl(void * (* func)(void * data),void * data1,rb_unblock_function_t * ubf,void * data2)1553 rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
1554 rb_unblock_function_t *ubf, void *data2)
1555 {
1556 return call_without_gvl(func, data1, ubf, data2, FALSE);
1557 }
1558
1559 VALUE
rb_thread_io_blocking_region(rb_blocking_function_t * func,void * data1,int fd)1560 rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
1561 {
1562 volatile VALUE val = Qundef; /* shouldn't be used */
1563 rb_execution_context_t * volatile ec = GET_EC();
1564 volatile int saved_errno = 0;
1565 enum ruby_tag_type state;
1566 struct waiting_fd wfd;
1567
1568 wfd.fd = fd;
1569 wfd.th = rb_ec_thread_ptr(ec);
1570 list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node);
1571
1572 EC_PUSH_TAG(ec);
1573 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1574 BLOCKING_REGION(wfd.th, {
1575 val = func(data1);
1576 saved_errno = errno;
1577 }, ubf_select, wfd.th, FALSE);
1578 }
1579 EC_POP_TAG();
1580
1581 /*
1582 * must be deleted before jump
1583 * this will delete either from waiting_fds or on-stack LIST_HEAD(busy)
1584 */
1585 list_del(&wfd.wfd_node);
1586
1587 if (state) {
1588 EC_JUMP_TAG(ec, state);
1589 }
1590 /* TODO: check func() */
1591 RUBY_VM_CHECK_INTS_BLOCKING(ec);
1592
1593 errno = saved_errno;
1594
1595 return val;
1596 }
1597
1598 /*
1599 * rb_thread_call_with_gvl - re-enter the Ruby world after GVL release.
1600 *
1601 * After releasing GVL using
1602 * rb_thread_call_without_gvl() you can not access Ruby values or invoke
1603 * methods. If you need to access Ruby you must use this function
1604 * rb_thread_call_with_gvl().
1605 *
1606 * This function rb_thread_call_with_gvl() does:
1607 * (1) acquire GVL.
1608 * (2) call passed function `func'.
1609 * (3) release GVL.
1610 * (4) return a value which is returned at (2).
1611 *
1612 * NOTE: You should not return Ruby object at (2) because such Object
1613 * will not be marked.
1614 *
1615 * NOTE: If an exception is raised in `func', this function DOES NOT
1616 * protect (catch) the exception. If you have any resources
1617 * which should free before throwing exception, you need use
1618 * rb_protect() in `func' and return a value which represents
1619 * exception was raised.
1620 *
1621 * NOTE: This function should not be called by a thread which was not
1622 * created as Ruby thread (created by Thread.new or so). In other
1623 * words, this function *DOES NOT* associate or convert a NON-Ruby
1624 * thread to a Ruby thread.
1625 */
1626 void *
rb_thread_call_with_gvl(void * (* func)(void *),void * data1)1627 rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
1628 {
1629 rb_thread_t *th = ruby_thread_from_native();
1630 struct rb_blocking_region_buffer *brb;
1631 struct rb_unblock_callback prev_unblock;
1632 void *r;
1633
1634 if (th == 0) {
1635 /* Error has occurred, but we can't use rb_bug()
1636 * because this thread is not Ruby's thread.
1637 * What should we do?
1638 */
1639
1640 fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n");
1641 exit(EXIT_FAILURE);
1642 }
1643
1644 brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer;
1645 prev_unblock = th->unblock;
1646
1647 if (brb == 0) {
1648 rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL.");
1649 }
1650
1651 blocking_region_end(th, brb);
1652 /* enter to Ruby world: You can access Ruby values, methods and so on. */
1653 r = (*func)(data1);
1654 /* leave from Ruby world: You can not access Ruby values, etc. */
1655 blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE);
1656 return r;
1657 }
1658
1659 /*
1660 * ruby_thread_has_gvl_p - check if current native thread has GVL.
1661 *
1662 ***
1663 *** This API is EXPERIMENTAL!
1664 *** We do not guarantee that this API remains in ruby 1.9.2 or later.
1665 ***
1666 */
1667
1668 int
ruby_thread_has_gvl_p(void)1669 ruby_thread_has_gvl_p(void)
1670 {
1671 rb_thread_t *th = ruby_thread_from_native();
1672
1673 if (th && th->blocking_region_buffer == 0) {
1674 return 1;
1675 }
1676 else {
1677 return 0;
1678 }
1679 }
1680
1681 /*
1682 * call-seq:
1683 * Thread.pass -> nil
1684 *
1685 * Give the thread scheduler a hint to pass execution to another thread.
1686 * A running thread may or may not switch, it depends on OS and processor.
1687 */
1688
1689 static VALUE
thread_s_pass(VALUE klass)1690 thread_s_pass(VALUE klass)
1691 {
1692 rb_thread_schedule();
1693 return Qnil;
1694 }
1695
1696 /*****************************************************/
1697
1698 /*
1699 * rb_threadptr_pending_interrupt_* - manage asynchronous error queue
1700 *
1701 * Async events such as an exception thrown by Thread#raise,
1702 * Thread#kill and thread termination (after main thread termination)
1703 * will be queued to th->pending_interrupt_queue.
1704 * - clear: clear the queue.
1705 * - enque: enqueue err object into queue.
1706 * - deque: dequeue err object from queue.
1707 * - active_p: return 1 if the queue should be checked.
1708 *
1709 * All rb_threadptr_pending_interrupt_* functions are called by
1710 * a GVL acquired thread, of course.
1711 * Note that all "rb_" prefix APIs need GVL to call.
1712 */
1713
1714 void
rb_threadptr_pending_interrupt_clear(rb_thread_t * th)1715 rb_threadptr_pending_interrupt_clear(rb_thread_t *th)
1716 {
1717 rb_ary_clear(th->pending_interrupt_queue);
1718 }
1719
1720 void
rb_threadptr_pending_interrupt_enque(rb_thread_t * th,VALUE v)1721 rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v)
1722 {
1723 rb_ary_push(th->pending_interrupt_queue, v);
1724 th->pending_interrupt_queue_checked = 0;
1725 }
1726
1727 static void
threadptr_check_pending_interrupt_queue(rb_thread_t * th)1728 threadptr_check_pending_interrupt_queue(rb_thread_t *th)
1729 {
1730 if (!th->pending_interrupt_queue) {
1731 rb_raise(rb_eThreadError, "uninitialized thread");
1732 }
1733 }
1734
1735 enum handle_interrupt_timing {
1736 INTERRUPT_NONE,
1737 INTERRUPT_IMMEDIATE,
1738 INTERRUPT_ON_BLOCKING,
1739 INTERRUPT_NEVER
1740 };
1741
1742 static enum handle_interrupt_timing
rb_threadptr_pending_interrupt_check_mask(rb_thread_t * th,VALUE err)1743 rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
1744 {
1745 VALUE mask;
1746 long mask_stack_len = RARRAY_LEN(th->pending_interrupt_mask_stack);
1747 const VALUE *mask_stack = RARRAY_CONST_PTR(th->pending_interrupt_mask_stack);
1748 VALUE mod;
1749 long i;
1750
1751 for (i=0; i<mask_stack_len; i++) {
1752 mask = mask_stack[mask_stack_len-(i+1)];
1753
1754 for (mod = err; mod; mod = RCLASS_SUPER(mod)) {
1755 VALUE klass = mod;
1756 VALUE sym;
1757
1758 if (BUILTIN_TYPE(mod) == T_ICLASS) {
1759 klass = RBASIC(mod)->klass;
1760 }
1761 else if (mod != RCLASS_ORIGIN(mod)) {
1762 continue;
1763 }
1764
1765 if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
1766 if (sym == sym_immediate) {
1767 return INTERRUPT_IMMEDIATE;
1768 }
1769 else if (sym == sym_on_blocking) {
1770 return INTERRUPT_ON_BLOCKING;
1771 }
1772 else if (sym == sym_never) {
1773 return INTERRUPT_NEVER;
1774 }
1775 else {
1776 rb_raise(rb_eThreadError, "unknown mask signature");
1777 }
1778 }
1779 }
1780 /* try to next mask */
1781 }
1782 return INTERRUPT_NONE;
1783 }
1784
1785 static int
rb_threadptr_pending_interrupt_empty_p(const rb_thread_t * th)1786 rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th)
1787 {
1788 return RARRAY_LEN(th->pending_interrupt_queue) == 0;
1789 }
1790
1791 static int
rb_threadptr_pending_interrupt_include_p(rb_thread_t * th,VALUE err)1792 rb_threadptr_pending_interrupt_include_p(rb_thread_t *th, VALUE err)
1793 {
1794 int i;
1795 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
1796 VALUE e = RARRAY_AREF(th->pending_interrupt_queue, i);
1797 if (rb_class_inherited_p(e, err)) {
1798 return TRUE;
1799 }
1800 }
1801 return FALSE;
1802 }
1803
1804 static VALUE
rb_threadptr_pending_interrupt_deque(rb_thread_t * th,enum handle_interrupt_timing timing)1805 rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timing timing)
1806 {
1807 #if 1 /* 1 to enable Thread#handle_interrupt, 0 to ignore it */
1808 int i;
1809
1810 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
1811 VALUE err = RARRAY_AREF(th->pending_interrupt_queue, i);
1812
1813 enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err));
1814
1815 switch (mask_timing) {
1816 case INTERRUPT_ON_BLOCKING:
1817 if (timing != INTERRUPT_ON_BLOCKING) {
1818 break;
1819 }
1820 /* fall through */
1821 case INTERRUPT_NONE: /* default: IMMEDIATE */
1822 case INTERRUPT_IMMEDIATE:
1823 rb_ary_delete_at(th->pending_interrupt_queue, i);
1824 return err;
1825 case INTERRUPT_NEVER:
1826 break;
1827 }
1828 }
1829
1830 th->pending_interrupt_queue_checked = 1;
1831 return Qundef;
1832 #else
1833 VALUE err = rb_ary_shift(th->pending_interrupt_queue);
1834 if (rb_threadptr_pending_interrupt_empty_p(th)) {
1835 th->pending_interrupt_queue_checked = 1;
1836 }
1837 return err;
1838 #endif
1839 }
1840
1841 static int
threadptr_pending_interrupt_active_p(rb_thread_t * th)1842 threadptr_pending_interrupt_active_p(rb_thread_t *th)
1843 {
1844 /*
1845 * For optimization, we don't check async errinfo queue
1846 * if the queue and the thread interrupt mask were not changed
1847 * since last check.
1848 */
1849 if (th->pending_interrupt_queue_checked) {
1850 return 0;
1851 }
1852
1853 if (rb_threadptr_pending_interrupt_empty_p(th)) {
1854 return 0;
1855 }
1856
1857 return 1;
1858 }
1859
1860 static int
handle_interrupt_arg_check_i(VALUE key,VALUE val,VALUE args)1861 handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args)
1862 {
1863 VALUE *maskp = (VALUE *)args;
1864
1865 if (val != sym_immediate && val != sym_on_blocking && val != sym_never) {
1866 rb_raise(rb_eArgError, "unknown mask signature");
1867 }
1868
1869 if (!*maskp) {
1870 *maskp = rb_ident_hash_new();
1871 }
1872 rb_hash_aset(*maskp, key, val);
1873
1874 return ST_CONTINUE;
1875 }
1876
1877 /*
1878 * call-seq:
1879 * Thread.handle_interrupt(hash) { ... } -> result of the block
1880 *
1881 * Changes asynchronous interrupt timing.
1882 *
1883 * _interrupt_ means asynchronous event and corresponding procedure
1884 * by Thread#raise, Thread#kill, signal trap (not supported yet)
1885 * and main thread termination (if main thread terminates, then all
1886 * other thread will be killed).
1887 *
1888 * The given +hash+ has pairs like <code>ExceptionClass =>
1889 * :TimingSymbol</code>. Where the ExceptionClass is the interrupt handled by
1890 * the given block. The TimingSymbol can be one of the following symbols:
1891 *
1892 * [+:immediate+] Invoke interrupts immediately.
1893 * [+:on_blocking+] Invoke interrupts while _BlockingOperation_.
1894 * [+:never+] Never invoke all interrupts.
1895 *
1896 * _BlockingOperation_ means that the operation will block the calling thread,
1897 * such as read and write. On CRuby implementation, _BlockingOperation_ is any
1898 * operation executed without GVL.
1899 *
1900 * Masked asynchronous interrupts are delayed until they are enabled.
1901 * This method is similar to sigprocmask(3).
1902 *
1903 * === NOTE
1904 *
1905 * Asynchronous interrupts are difficult to use.
1906 *
1907 * If you need to communicate between threads, please consider to use another way such as Queue.
1908 *
1909 * Or use them with deep understanding about this method.
1910 *
1911 * === Usage
1912 *
1913 * In this example, we can guard from Thread#raise exceptions.
1914 *
1915 * Using the +:never+ TimingSymbol the RuntimeError exception will always be
1916 * ignored in the first block of the main thread. In the second
1917 * ::handle_interrupt block we can purposefully handle RuntimeError exceptions.
1918 *
1919 * th = Thread.new do
1920 * Thread.handle_interrupt(RuntimeError => :never) {
1921 * begin
1922 * # You can write resource allocation code safely.
1923 * Thread.handle_interrupt(RuntimeError => :immediate) {
1924 * # ...
1925 * }
1926 * ensure
1927 * # You can write resource deallocation code safely.
1928 * end
1929 * }
1930 * end
1931 * Thread.pass
1932 * # ...
1933 * th.raise "stop"
1934 *
1935 * While we are ignoring the RuntimeError exception, it's safe to write our
1936 * resource allocation code. Then, the ensure block is where we can safely
1937 * deallocate your resources.
1938 *
1939 * ==== Guarding from Timeout::Error
1940 *
1941 * In the next example, we will guard from the Timeout::Error exception. This
1942 * will help prevent from leaking resources when Timeout::Error exceptions occur
1943 * during normal ensure clause. For this example we use the help of the
1944 * standard library Timeout, from lib/timeout.rb
1945 *
1946 * require 'timeout'
1947 * Thread.handle_interrupt(Timeout::Error => :never) {
1948 * timeout(10){
1949 * # Timeout::Error doesn't occur here
1950 * Thread.handle_interrupt(Timeout::Error => :on_blocking) {
1951 * # possible to be killed by Timeout::Error
1952 * # while blocking operation
1953 * }
1954 * # Timeout::Error doesn't occur here
1955 * }
1956 * }
1957 *
1958 * In the first part of the +timeout+ block, we can rely on Timeout::Error being
1959 * ignored. Then in the <code>Timeout::Error => :on_blocking</code> block, any
1960 * operation that will block the calling thread is susceptible to a
1961 * Timeout::Error exception being raised.
1962 *
1963 * ==== Stack control settings
1964 *
1965 * It's possible to stack multiple levels of ::handle_interrupt blocks in order
1966 * to control more than one ExceptionClass and TimingSymbol at a time.
1967 *
1968 * Thread.handle_interrupt(FooError => :never) {
1969 * Thread.handle_interrupt(BarError => :never) {
1970 * # FooError and BarError are prohibited.
1971 * }
1972 * }
1973 *
1974 * ==== Inheritance with ExceptionClass
1975 *
1976 * All exceptions inherited from the ExceptionClass parameter will be considered.
1977 *
1978 * Thread.handle_interrupt(Exception => :never) {
1979 * # all exceptions inherited from Exception are prohibited.
1980 * }
1981 *
1982 */
1983 static VALUE
rb_thread_s_handle_interrupt(VALUE self,VALUE mask_arg)1984 rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
1985 {
1986 VALUE mask;
1987 rb_execution_context_t * volatile ec = GET_EC();
1988 rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
1989 volatile VALUE r = Qnil;
1990 enum ruby_tag_type state;
1991
1992 if (!rb_block_given_p()) {
1993 rb_raise(rb_eArgError, "block is needed.");
1994 }
1995
1996 mask = 0;
1997 mask_arg = rb_to_hash_type(mask_arg);
1998 rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask);
1999 if (!mask) {
2000 return rb_yield(Qnil);
2001 }
2002 OBJ_FREEZE_RAW(mask);
2003 rb_ary_push(th->pending_interrupt_mask_stack, mask);
2004 if (!rb_threadptr_pending_interrupt_empty_p(th)) {
2005 th->pending_interrupt_queue_checked = 0;
2006 RUBY_VM_SET_INTERRUPT(th->ec);
2007 }
2008
2009 EC_PUSH_TAG(th->ec);
2010 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
2011 r = rb_yield(Qnil);
2012 }
2013 EC_POP_TAG();
2014
2015 rb_ary_pop(th->pending_interrupt_mask_stack);
2016 if (!rb_threadptr_pending_interrupt_empty_p(th)) {
2017 th->pending_interrupt_queue_checked = 0;
2018 RUBY_VM_SET_INTERRUPT(th->ec);
2019 }
2020
2021 RUBY_VM_CHECK_INTS(th->ec);
2022
2023 if (state) {
2024 EC_JUMP_TAG(th->ec, state);
2025 }
2026
2027 return r;
2028 }
2029
2030 /*
2031 * call-seq:
2032 * target_thread.pending_interrupt?(error = nil) -> true/false
2033 *
2034 * Returns whether or not the asynchronous queue is empty for the target thread.
2035 *
2036 * If +error+ is given, then check only for +error+ type deferred events.
2037 *
2038 * See ::pending_interrupt? for more information.
2039 */
2040 static VALUE
rb_thread_pending_interrupt_p(int argc,VALUE * argv,VALUE target_thread)2041 rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread)
2042 {
2043 rb_thread_t *target_th = rb_thread_ptr(target_thread);
2044
2045 if (!target_th->pending_interrupt_queue) {
2046 return Qfalse;
2047 }
2048 if (rb_threadptr_pending_interrupt_empty_p(target_th)) {
2049 return Qfalse;
2050 }
2051 if (rb_check_arity(argc, 0, 1)) {
2052 VALUE err = argv[0];
2053 if (!rb_obj_is_kind_of(err, rb_cModule)) {
2054 rb_raise(rb_eTypeError, "class or module required for rescue clause");
2055 }
2056 if (rb_threadptr_pending_interrupt_include_p(target_th, err)) {
2057 return Qtrue;
2058 }
2059 else {
2060 return Qfalse;
2061 }
2062 }
2063 else {
2064 return Qtrue;
2065 }
2066 }
2067
2068 /*
2069 * call-seq:
2070 * Thread.pending_interrupt?(error = nil) -> true/false
2071 *
2072 * Returns whether or not the asynchronous queue is empty.
2073 *
2074 * Since Thread::handle_interrupt can be used to defer asynchronous events,
2075 * this method can be used to determine if there are any deferred events.
2076 *
2077 * If you find this method returns true, then you may finish +:never+ blocks.
2078 *
2079 * For example, the following method processes deferred asynchronous events
2080 * immediately.
2081 *
2082 * def Thread.kick_interrupt_immediately
2083 * Thread.handle_interrupt(Object => :immediate) {
2084 * Thread.pass
2085 * }
2086 * end
2087 *
2088 * If +error+ is given, then check only for +error+ type deferred events.
2089 *
2090 * === Usage
2091 *
2092 * th = Thread.new{
2093 * Thread.handle_interrupt(RuntimeError => :on_blocking){
2094 * while true
2095 * ...
2096 * # reach safe point to invoke interrupt
2097 * if Thread.pending_interrupt?
2098 * Thread.handle_interrupt(Object => :immediate){}
2099 * end
2100 * ...
2101 * end
2102 * }
2103 * }
2104 * ...
2105 * th.raise # stop thread
2106 *
2107 * This example can also be written as the following, which you should use to
2108 * avoid asynchronous interrupts.
2109 *
2110 * flag = true
2111 * th = Thread.new{
2112 * Thread.handle_interrupt(RuntimeError => :on_blocking){
2113 * while true
2114 * ...
2115 * # reach safe point to invoke interrupt
2116 * break if flag == false
2117 * ...
2118 * end
2119 * }
2120 * }
2121 * ...
2122 * flag = false # stop thread
2123 */
2124
2125 static VALUE
rb_thread_s_pending_interrupt_p(int argc,VALUE * argv,VALUE self)2126 rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self)
2127 {
2128 return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self);
2129 }
2130
2131 NORETURN(static void rb_threadptr_to_kill(rb_thread_t *th));
2132
2133 static void
rb_threadptr_to_kill(rb_thread_t * th)2134 rb_threadptr_to_kill(rb_thread_t *th)
2135 {
2136 rb_threadptr_pending_interrupt_clear(th);
2137 th->status = THREAD_RUNNABLE;
2138 th->to_kill = 1;
2139 th->ec->errinfo = INT2FIX(TAG_FATAL);
2140 EC_JUMP_TAG(th->ec, TAG_FATAL);
2141 }
2142
2143 static inline rb_atomic_t
threadptr_get_interrupts(rb_thread_t * th)2144 threadptr_get_interrupts(rb_thread_t *th)
2145 {
2146 rb_execution_context_t *ec = th->ec;
2147 rb_atomic_t interrupt;
2148 rb_atomic_t old;
2149
2150 do {
2151 interrupt = ec->interrupt_flag;
2152 old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask);
2153 } while (old != interrupt);
2154 return interrupt & (rb_atomic_t)~ec->interrupt_mask;
2155 }
2156
2157 MJIT_FUNC_EXPORTED int
rb_threadptr_execute_interrupts(rb_thread_t * th,int blocking_timing)2158 rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
2159 {
2160 rb_atomic_t interrupt;
2161 int postponed_job_interrupt = 0;
2162 int ret = FALSE;
2163
2164 if (th->ec->raised_flag) return ret;
2165
2166 while ((interrupt = threadptr_get_interrupts(th)) != 0) {
2167 int sig;
2168 int timer_interrupt;
2169 int pending_interrupt;
2170 int trap_interrupt;
2171
2172 timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
2173 pending_interrupt = interrupt & PENDING_INTERRUPT_MASK;
2174 postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK;
2175 trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
2176
2177 if (postponed_job_interrupt) {
2178 rb_postponed_job_flush(th->vm);
2179 }
2180
2181 /* signal handling */
2182 if (trap_interrupt && (th == th->vm->main_thread)) {
2183 enum rb_thread_status prev_status = th->status;
2184 int sigwait_fd = rb_sigwait_fd_get(th);
2185
2186 if (sigwait_fd >= 0) {
2187 (void)consume_communication_pipe(sigwait_fd);
2188 ruby_sigchld_handler(th->vm);
2189 rb_sigwait_fd_put(th, sigwait_fd);
2190 rb_sigwait_fd_migrate(th->vm);
2191 }
2192 th->status = THREAD_RUNNABLE;
2193 while ((sig = rb_get_next_signal()) != 0) {
2194 ret |= rb_signal_exec(th, sig);
2195 }
2196 th->status = prev_status;
2197 }
2198
2199 /* exception from another thread */
2200 if (pending_interrupt && threadptr_pending_interrupt_active_p(th)) {
2201 VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
2202 thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
2203 ret = TRUE;
2204
2205 if (err == Qundef) {
2206 /* no error */
2207 }
2208 else if (err == eKillSignal /* Thread#kill received */ ||
2209 err == eTerminateSignal /* Terminate thread */ ||
2210 err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) {
2211 rb_threadptr_to_kill(th);
2212 }
2213 else {
2214 if (err == th->vm->special_exceptions[ruby_error_stream_closed]) {
2215 /* the only special exception to be queued across thread */
2216 err = ruby_vm_special_exception_copy(err);
2217 }
2218 /* set runnable if th was slept. */
2219 if (th->status == THREAD_STOPPED ||
2220 th->status == THREAD_STOPPED_FOREVER)
2221 th->status = THREAD_RUNNABLE;
2222 rb_exc_raise(err);
2223 }
2224 }
2225
2226 if (timer_interrupt) {
2227 uint32_t limits_us = TIME_QUANTUM_USEC;
2228
2229 if (th->priority > 0)
2230 limits_us <<= th->priority;
2231 else
2232 limits_us >>= -th->priority;
2233
2234 if (th->status == THREAD_RUNNABLE)
2235 th->running_time_us += TIME_QUANTUM_USEC;
2236
2237 EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self,
2238 0, 0, 0, Qundef);
2239
2240 rb_thread_schedule_limits(limits_us);
2241 }
2242 }
2243 return ret;
2244 }
2245
2246 void
rb_thread_execute_interrupts(VALUE thval)2247 rb_thread_execute_interrupts(VALUE thval)
2248 {
2249 rb_threadptr_execute_interrupts(rb_thread_ptr(thval), 1);
2250 }
2251
2252 static void
rb_threadptr_ready(rb_thread_t * th)2253 rb_threadptr_ready(rb_thread_t *th)
2254 {
2255 rb_threadptr_interrupt(th);
2256 }
2257
2258 static VALUE
rb_threadptr_raise(rb_thread_t * target_th,int argc,VALUE * argv)2259 rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv)
2260 {
2261 VALUE exc;
2262
2263 if (rb_threadptr_dead(target_th)) {
2264 return Qnil;
2265 }
2266
2267 if (argc == 0) {
2268 exc = rb_exc_new(rb_eRuntimeError, 0, 0);
2269 }
2270 else {
2271 exc = rb_make_exception(argc, argv);
2272 }
2273
2274 /* making an exception object can switch thread,
2275 so we need to check thread deadness again */
2276 if (rb_threadptr_dead(target_th)) {
2277 return Qnil;
2278 }
2279
2280 rb_ec_setup_exception(GET_EC(), exc, Qundef);
2281 rb_threadptr_pending_interrupt_enque(target_th, exc);
2282 rb_threadptr_interrupt(target_th);
2283 return Qnil;
2284 }
2285
2286 void
rb_threadptr_signal_raise(rb_thread_t * th,int sig)2287 rb_threadptr_signal_raise(rb_thread_t *th, int sig)
2288 {
2289 VALUE argv[2];
2290
2291 argv[0] = rb_eSignal;
2292 argv[1] = INT2FIX(sig);
2293 rb_threadptr_raise(th->vm->main_thread, 2, argv);
2294 }
2295
2296 void
rb_threadptr_signal_exit(rb_thread_t * th)2297 rb_threadptr_signal_exit(rb_thread_t *th)
2298 {
2299 VALUE argv[2];
2300
2301 argv[0] = rb_eSystemExit;
2302 argv[1] = rb_str_new2("exit");
2303 rb_threadptr_raise(th->vm->main_thread, 2, argv);
2304 }
2305
2306 int
rb_ec_set_raised(rb_execution_context_t * ec)2307 rb_ec_set_raised(rb_execution_context_t *ec)
2308 {
2309 if (ec->raised_flag & RAISED_EXCEPTION) {
2310 return 1;
2311 }
2312 ec->raised_flag |= RAISED_EXCEPTION;
2313 return 0;
2314 }
2315
2316 int
rb_ec_reset_raised(rb_execution_context_t * ec)2317 rb_ec_reset_raised(rb_execution_context_t *ec)
2318 {
2319 if (!(ec->raised_flag & RAISED_EXCEPTION)) {
2320 return 0;
2321 }
2322 ec->raised_flag &= ~RAISED_EXCEPTION;
2323 return 1;
2324 }
2325
2326 int
rb_notify_fd_close(int fd,struct list_head * busy)2327 rb_notify_fd_close(int fd, struct list_head *busy)
2328 {
2329 rb_vm_t *vm = GET_THREAD()->vm;
2330 struct waiting_fd *wfd = 0, *next;
2331
2332 list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
2333 if (wfd->fd == fd) {
2334 rb_thread_t *th = wfd->th;
2335 VALUE err;
2336
2337 list_del(&wfd->wfd_node);
2338 list_add(busy, &wfd->wfd_node);
2339
2340 err = th->vm->special_exceptions[ruby_error_stream_closed];
2341 rb_threadptr_pending_interrupt_enque(th, err);
2342 rb_threadptr_interrupt(th);
2343 }
2344 }
2345 return !list_empty(busy);
2346 }
2347
2348 void
rb_thread_fd_close(int fd)2349 rb_thread_fd_close(int fd)
2350 {
2351 struct list_head busy;
2352
2353 list_head_init(&busy);
2354 if (rb_notify_fd_close(fd, &busy)) {
2355 do rb_thread_schedule(); while (!list_empty(&busy));
2356 }
2357 }
2358
2359 /*
2360 * call-seq:
2361 * thr.raise
2362 * thr.raise(string)
2363 * thr.raise(exception [, string [, array]])
2364 *
2365 * Raises an exception from the given thread. The caller does not have to be
2366 * +thr+. See Kernel#raise for more information.
2367 *
2368 * Thread.abort_on_exception = true
2369 * a = Thread.new { sleep(200) }
2370 * a.raise("Gotcha")
2371 *
2372 * This will produce:
2373 *
2374 * prog.rb:3: Gotcha (RuntimeError)
2375 * from prog.rb:2:in `initialize'
2376 * from prog.rb:2:in `new'
2377 * from prog.rb:2
2378 */
2379
2380 static VALUE
thread_raise_m(int argc,VALUE * argv,VALUE self)2381 thread_raise_m(int argc, VALUE *argv, VALUE self)
2382 {
2383 rb_thread_t *target_th = rb_thread_ptr(self);
2384 const rb_thread_t *current_th = GET_THREAD();
2385
2386 threadptr_check_pending_interrupt_queue(target_th);
2387 rb_threadptr_raise(target_th, argc, argv);
2388
2389 /* To perform Thread.current.raise as Kernel.raise */
2390 if (current_th == target_th) {
2391 RUBY_VM_CHECK_INTS(target_th->ec);
2392 }
2393 return Qnil;
2394 }
2395
2396
2397 /*
2398 * call-seq:
2399 * thr.exit -> thr or nil
2400 * thr.kill -> thr or nil
2401 * thr.terminate -> thr or nil
2402 *
2403 * Terminates +thr+ and schedules another thread to be run.
2404 *
2405 * If this thread is already marked to be killed, #exit returns the Thread.
2406 *
2407 * If this is the main thread, or the last thread, exits the process.
2408 */
2409
2410 VALUE
rb_thread_kill(VALUE thread)2411 rb_thread_kill(VALUE thread)
2412 {
2413 rb_thread_t *th = rb_thread_ptr(thread);
2414
2415 if (th->to_kill || th->status == THREAD_KILLED) {
2416 return thread;
2417 }
2418 if (th == th->vm->main_thread) {
2419 rb_exit(EXIT_SUCCESS);
2420 }
2421
2422 thread_debug("rb_thread_kill: %p (%"PRI_THREAD_ID")\n", (void *)th, thread_id_str(th));
2423
2424 if (th == GET_THREAD()) {
2425 /* kill myself immediately */
2426 rb_threadptr_to_kill(th);
2427 }
2428 else {
2429 threadptr_check_pending_interrupt_queue(th);
2430 rb_threadptr_pending_interrupt_enque(th, eKillSignal);
2431 rb_threadptr_interrupt(th);
2432 }
2433 return thread;
2434 }
2435
2436 int
rb_thread_to_be_killed(VALUE thread)2437 rb_thread_to_be_killed(VALUE thread)
2438 {
2439 rb_thread_t *th = rb_thread_ptr(thread);
2440
2441 if (th->to_kill || th->status == THREAD_KILLED) {
2442 return TRUE;
2443 }
2444 return FALSE;
2445 }
2446
2447 /*
2448 * call-seq:
2449 * Thread.kill(thread) -> thread
2450 *
2451 * Causes the given +thread+ to exit, see also Thread::exit.
2452 *
2453 * count = 0
2454 * a = Thread.new { loop { count += 1 } }
2455 * sleep(0.1) #=> 0
2456 * Thread.kill(a) #=> #<Thread:0x401b3d30 dead>
2457 * count #=> 93947
2458 * a.alive? #=> false
2459 */
2460
2461 static VALUE
rb_thread_s_kill(VALUE obj,VALUE th)2462 rb_thread_s_kill(VALUE obj, VALUE th)
2463 {
2464 return rb_thread_kill(th);
2465 }
2466
2467
2468 /*
2469 * call-seq:
2470 * Thread.exit -> thread
2471 *
2472 * Terminates the currently running thread and schedules another thread to be
2473 * run.
2474 *
2475 * If this thread is already marked to be killed, ::exit returns the Thread.
2476 *
2477 * If this is the main thread, or the last thread, exit the process.
2478 */
2479
2480 static VALUE
rb_thread_exit(void)2481 rb_thread_exit(void)
2482 {
2483 rb_thread_t *th = GET_THREAD();
2484 return rb_thread_kill(th->self);
2485 }
2486
2487
2488 /*
2489 * call-seq:
2490 * thr.wakeup -> thr
2491 *
2492 * Marks a given thread as eligible for scheduling, however it may still
2493 * remain blocked on I/O.
2494 *
2495 * *Note:* This does not invoke the scheduler, see #run for more information.
2496 *
2497 * c = Thread.new { Thread.stop; puts "hey!" }
2498 * sleep 0.1 while c.status!='sleep'
2499 * c.wakeup
2500 * c.join
2501 * #=> "hey!"
2502 */
2503
2504 VALUE
rb_thread_wakeup(VALUE thread)2505 rb_thread_wakeup(VALUE thread)
2506 {
2507 if (!RTEST(rb_thread_wakeup_alive(thread))) {
2508 rb_raise(rb_eThreadError, "killed thread");
2509 }
2510 return thread;
2511 }
2512
2513 VALUE
rb_thread_wakeup_alive(VALUE thread)2514 rb_thread_wakeup_alive(VALUE thread)
2515 {
2516 rb_thread_t *target_th = rb_thread_ptr(thread);
2517 if (target_th->status == THREAD_KILLED) return Qnil;
2518
2519 rb_threadptr_ready(target_th);
2520
2521 if (target_th->status == THREAD_STOPPED ||
2522 target_th->status == THREAD_STOPPED_FOREVER) {
2523 target_th->status = THREAD_RUNNABLE;
2524 }
2525
2526 return thread;
2527 }
2528
2529
2530 /*
2531 * call-seq:
2532 * thr.run -> thr
2533 *
2534 * Wakes up +thr+, making it eligible for scheduling.
2535 *
2536 * a = Thread.new { puts "a"; Thread.stop; puts "c" }
2537 * sleep 0.1 while a.status!='sleep'
2538 * puts "Got here"
2539 * a.run
2540 * a.join
2541 *
2542 * This will produce:
2543 *
2544 * a
2545 * Got here
2546 * c
2547 *
2548 * See also the instance method #wakeup.
2549 */
2550
2551 VALUE
rb_thread_run(VALUE thread)2552 rb_thread_run(VALUE thread)
2553 {
2554 rb_thread_wakeup(thread);
2555 rb_thread_schedule();
2556 return thread;
2557 }
2558
2559
2560 /*
2561 * call-seq:
2562 * Thread.stop -> nil
2563 *
2564 * Stops execution of the current thread, putting it into a ``sleep'' state,
2565 * and schedules execution of another thread.
2566 *
2567 * a = Thread.new { print "a"; Thread.stop; print "c" }
2568 * sleep 0.1 while a.status!='sleep'
2569 * print "b"
2570 * a.run
2571 * a.join
2572 * #=> "abc"
2573 */
2574
2575 VALUE
rb_thread_stop(void)2576 rb_thread_stop(void)
2577 {
2578 if (rb_thread_alone()) {
2579 rb_raise(rb_eThreadError,
2580 "stopping only thread\n\tnote: use sleep to stop forever");
2581 }
2582 rb_thread_sleep_deadly();
2583 return Qnil;
2584 }
2585
2586 /********************************************************************/
2587
2588 /*
2589 * call-seq:
2590 * Thread.list -> array
2591 *
2592 * Returns an array of Thread objects for all threads that are either runnable
2593 * or stopped.
2594 *
2595 * Thread.new { sleep(200) }
2596 * Thread.new { 1000000.times {|i| i*i } }
2597 * Thread.new { Thread.stop }
2598 * Thread.list.each {|t| p t}
2599 *
2600 * This will produce:
2601 *
2602 * #<Thread:0x401b3e84 sleep>
2603 * #<Thread:0x401b3f38 run>
2604 * #<Thread:0x401b3fb0 sleep>
2605 * #<Thread:0x401bdf4c run>
2606 */
2607
2608 VALUE
rb_thread_list(void)2609 rb_thread_list(void)
2610 {
2611 VALUE ary = rb_ary_new();
2612 rb_vm_t *vm = GET_THREAD()->vm;
2613 rb_thread_t *th = 0;
2614
2615 list_for_each(&vm->living_threads, th, vmlt_node) {
2616 switch (th->status) {
2617 case THREAD_RUNNABLE:
2618 case THREAD_STOPPED:
2619 case THREAD_STOPPED_FOREVER:
2620 rb_ary_push(ary, th->self);
2621 default:
2622 break;
2623 }
2624 }
2625 return ary;
2626 }
2627
2628 VALUE
rb_thread_current(void)2629 rb_thread_current(void)
2630 {
2631 return GET_THREAD()->self;
2632 }
2633
2634 /*
2635 * call-seq:
2636 * Thread.current -> thread
2637 *
2638 * Returns the currently executing thread.
2639 *
2640 * Thread.current #=> #<Thread:0x401bdf4c run>
2641 */
2642
2643 static VALUE
thread_s_current(VALUE klass)2644 thread_s_current(VALUE klass)
2645 {
2646 return rb_thread_current();
2647 }
2648
2649 VALUE
rb_thread_main(void)2650 rb_thread_main(void)
2651 {
2652 return GET_THREAD()->vm->main_thread->self;
2653 }
2654
2655 /*
2656 * call-seq:
2657 * Thread.main -> thread
2658 *
2659 * Returns the main thread.
2660 */
2661
2662 static VALUE
rb_thread_s_main(VALUE klass)2663 rb_thread_s_main(VALUE klass)
2664 {
2665 return rb_thread_main();
2666 }
2667
2668
2669 /*
2670 * call-seq:
2671 * Thread.abort_on_exception -> true or false
2672 *
2673 * Returns the status of the global ``abort on exception'' condition.
2674 *
2675 * The default is +false+.
2676 *
2677 * When set to +true+, if any thread is aborted by an exception, the
2678 * raised exception will be re-raised in the main thread.
2679 *
2680 * Can also be specified by the global $DEBUG flag or command line option
2681 * +-d+.
2682 *
2683 * See also ::abort_on_exception=.
2684 *
2685 * There is also an instance level method to set this for a specific thread,
2686 * see #abort_on_exception.
2687 */
2688
2689 static VALUE
rb_thread_s_abort_exc(void)2690 rb_thread_s_abort_exc(void)
2691 {
2692 return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse;
2693 }
2694
2695
2696 /*
2697 * call-seq:
2698 * Thread.abort_on_exception= boolean -> true or false
2699 *
2700 * When set to +true+, if any thread is aborted by an exception, the
2701 * raised exception will be re-raised in the main thread.
2702 * Returns the new state.
2703 *
2704 * Thread.abort_on_exception = true
2705 * t1 = Thread.new do
2706 * puts "In new thread"
2707 * raise "Exception from thread"
2708 * end
2709 * sleep(1)
2710 * puts "not reached"
2711 *
2712 * This will produce:
2713 *
2714 * In new thread
2715 * prog.rb:4: Exception from thread (RuntimeError)
2716 * from prog.rb:2:in `initialize'
2717 * from prog.rb:2:in `new'
2718 * from prog.rb:2
2719 *
2720 * See also ::abort_on_exception.
2721 *
2722 * There is also an instance level method to set this for a specific thread,
2723 * see #abort_on_exception=.
2724 */
2725
2726 static VALUE
rb_thread_s_abort_exc_set(VALUE self,VALUE val)2727 rb_thread_s_abort_exc_set(VALUE self, VALUE val)
2728 {
2729 GET_THREAD()->vm->thread_abort_on_exception = RTEST(val);
2730 return val;
2731 }
2732
2733
2734 /*
2735 * call-seq:
2736 * thr.abort_on_exception -> true or false
2737 *
2738 * Returns the status of the thread-local ``abort on exception'' condition for
2739 * this +thr+.
2740 *
2741 * The default is +false+.
2742 *
2743 * See also #abort_on_exception=.
2744 *
2745 * There is also a class level method to set this for all threads, see
2746 * ::abort_on_exception.
2747 */
2748
2749 static VALUE
rb_thread_abort_exc(VALUE thread)2750 rb_thread_abort_exc(VALUE thread)
2751 {
2752 return rb_thread_ptr(thread)->abort_on_exception ? Qtrue : Qfalse;
2753 }
2754
2755
2756 /*
2757 * call-seq:
2758 * thr.abort_on_exception= boolean -> true or false
2759 *
2760 * When set to +true+, if this +thr+ is aborted by an exception, the
2761 * raised exception will be re-raised in the main thread.
2762 *
2763 * See also #abort_on_exception.
2764 *
2765 * There is also a class level method to set this for all threads, see
2766 * ::abort_on_exception=.
2767 */
2768
2769 static VALUE
rb_thread_abort_exc_set(VALUE thread,VALUE val)2770 rb_thread_abort_exc_set(VALUE thread, VALUE val)
2771 {
2772 rb_thread_ptr(thread)->abort_on_exception = RTEST(val);
2773 return val;
2774 }
2775
2776
2777 /*
2778 * call-seq:
2779 * Thread.report_on_exception -> true or false
2780 *
2781 * Returns the status of the global ``report on exception'' condition.
2782 *
2783 * The default is +true+ since Ruby 2.5.
2784 *
2785 * All threads created when this flag is true will report
2786 * a message on $stderr if an exception kills the thread.
2787 *
2788 * Thread.new { 1.times { raise } }
2789 *
2790 * will produce this output on $stderr:
2791 *
2792 * #<Thread:...> terminated with exception (report_on_exception is true):
2793 * Traceback (most recent call last):
2794 * 2: from -e:1:in `block in <main>'
2795 * 1: from -e:1:in `times'
2796 *
2797 * This is done to catch errors in threads early.
2798 * In some cases, you might not want this output.
2799 * There are multiple ways to avoid the extra output:
2800 *
2801 * * If the exception is not intended, the best is to fix the cause of
2802 * the exception so it does not happen anymore.
2803 * * If the exception is intended, it might be better to rescue it closer to
2804 * where it is raised rather then let it kill the Thread.
2805 * * If it is guaranteed the Thread will be joined with Thread#join or
2806 * Thread#value, then it is safe to disable this report with
2807 * <code>Thread.current.report_on_exception = false</code>
2808 * when starting the Thread.
2809 * However, this might handle the exception much later, or not at all
2810 * if the Thread is never joined due to the parent thread being blocked, etc.
2811 *
2812 * See also ::report_on_exception=.
2813 *
2814 * There is also an instance level method to set this for a specific thread,
2815 * see #report_on_exception=.
2816 *
2817 */
2818
2819 static VALUE
rb_thread_s_report_exc(void)2820 rb_thread_s_report_exc(void)
2821 {
2822 return GET_THREAD()->vm->thread_report_on_exception ? Qtrue : Qfalse;
2823 }
2824
2825
2826 /*
2827 * call-seq:
2828 * Thread.report_on_exception= boolean -> true or false
2829 *
2830 * Returns the new state.
2831 * When set to +true+, all threads created afterwards will inherit the
2832 * condition and report a message on $stderr if an exception kills a thread:
2833 *
2834 * Thread.report_on_exception = true
2835 * t1 = Thread.new do
2836 * puts "In new thread"
2837 * raise "Exception from thread"
2838 * end
2839 * sleep(1)
2840 * puts "In the main thread"
2841 *
2842 * This will produce:
2843 *
2844 * In new thread
2845 * #<Thread:...prog.rb:2> terminated with exception (report_on_exception is true):
2846 * Traceback (most recent call last):
2847 * prog.rb:4:in `block in <main>': Exception from thread (RuntimeError)
2848 * In the main thread
2849 *
2850 * See also ::report_on_exception.
2851 *
2852 * There is also an instance level method to set this for a specific thread,
2853 * see #report_on_exception=.
2854 */
2855
2856 static VALUE
rb_thread_s_report_exc_set(VALUE self,VALUE val)2857 rb_thread_s_report_exc_set(VALUE self, VALUE val)
2858 {
2859 GET_THREAD()->vm->thread_report_on_exception = RTEST(val);
2860 return val;
2861 }
2862
2863
2864 /*
2865 * call-seq:
2866 * thr.report_on_exception -> true or false
2867 *
2868 * Returns the status of the thread-local ``report on exception'' condition for
2869 * this +thr+.
2870 *
2871 * The default value when creating a Thread is the value of
2872 * the global flag Thread.report_on_exception.
2873 *
2874 * See also #report_on_exception=.
2875 *
2876 * There is also a class level method to set this for all new threads, see
2877 * ::report_on_exception=.
2878 */
2879
2880 static VALUE
rb_thread_report_exc(VALUE thread)2881 rb_thread_report_exc(VALUE thread)
2882 {
2883 return rb_thread_ptr(thread)->report_on_exception ? Qtrue : Qfalse;
2884 }
2885
2886
2887 /*
2888 * call-seq:
2889 * thr.report_on_exception= boolean -> true or false
2890 *
2891 * When set to +true+, a message is printed on $stderr if an exception
2892 * kills this +thr+. See ::report_on_exception for details.
2893 *
2894 * See also #report_on_exception.
2895 *
2896 * There is also a class level method to set this for all new threads, see
2897 * ::report_on_exception=.
2898 */
2899
2900 static VALUE
rb_thread_report_exc_set(VALUE thread,VALUE val)2901 rb_thread_report_exc_set(VALUE thread, VALUE val)
2902 {
2903 rb_thread_ptr(thread)->report_on_exception = RTEST(val);
2904 return val;
2905 }
2906
2907
2908 /*
2909 * call-seq:
2910 * thr.group -> thgrp or nil
2911 *
2912 * Returns the ThreadGroup which contains the given thread, or returns +nil+
2913 * if +thr+ is not a member of any group.
2914 *
2915 * Thread.main.group #=> #<ThreadGroup:0x4029d914>
2916 */
2917
2918 VALUE
rb_thread_group(VALUE thread)2919 rb_thread_group(VALUE thread)
2920 {
2921 VALUE group = rb_thread_ptr(thread)->thgroup;
2922 return group == 0 ? Qnil : group;
2923 }
2924
2925 static const char *
thread_status_name(rb_thread_t * th,int detail)2926 thread_status_name(rb_thread_t *th, int detail)
2927 {
2928 switch (th->status) {
2929 case THREAD_RUNNABLE:
2930 return th->to_kill ? "aborting" : "run";
2931 case THREAD_STOPPED_FOREVER:
2932 if (detail) return "sleep_forever";
2933 case THREAD_STOPPED:
2934 return "sleep";
2935 case THREAD_KILLED:
2936 return "dead";
2937 default:
2938 return "unknown";
2939 }
2940 }
2941
2942 static int
rb_threadptr_dead(rb_thread_t * th)2943 rb_threadptr_dead(rb_thread_t *th)
2944 {
2945 return th->status == THREAD_KILLED;
2946 }
2947
2948
2949 /*
2950 * call-seq:
2951 * thr.status -> string, false or nil
2952 *
2953 * Returns the status of +thr+.
2954 *
2955 * [<tt>"sleep"</tt>]
2956 * Returned if this thread is sleeping or waiting on I/O
2957 * [<tt>"run"</tt>]
2958 * When this thread is executing
2959 * [<tt>"aborting"</tt>]
2960 * If this thread is aborting
2961 * [+false+]
2962 * When this thread is terminated normally
2963 * [+nil+]
2964 * If terminated with an exception.
2965 *
2966 * a = Thread.new { raise("die now") }
2967 * b = Thread.new { Thread.stop }
2968 * c = Thread.new { Thread.exit }
2969 * d = Thread.new { sleep }
2970 * d.kill #=> #<Thread:0x401b3678 aborting>
2971 * a.status #=> nil
2972 * b.status #=> "sleep"
2973 * c.status #=> false
2974 * d.status #=> "aborting"
2975 * Thread.current.status #=> "run"
2976 *
2977 * See also the instance methods #alive? and #stop?
2978 */
2979
2980 static VALUE
rb_thread_status(VALUE thread)2981 rb_thread_status(VALUE thread)
2982 {
2983 rb_thread_t *target_th = rb_thread_ptr(thread);
2984
2985 if (rb_threadptr_dead(target_th)) {
2986 if (!NIL_P(target_th->ec->errinfo) &&
2987 !FIXNUM_P(target_th->ec->errinfo)) {
2988 return Qnil;
2989 }
2990 else {
2991 return Qfalse;
2992 }
2993 }
2994 else {
2995 return rb_str_new2(thread_status_name(target_th, FALSE));
2996 }
2997 }
2998
2999
3000 /*
3001 * call-seq:
3002 * thr.alive? -> true or false
3003 *
3004 * Returns +true+ if +thr+ is running or sleeping.
3005 *
3006 * thr = Thread.new { }
3007 * thr.join #=> #<Thread:0x401b3fb0 dead>
3008 * Thread.current.alive? #=> true
3009 * thr.alive? #=> false
3010 *
3011 * See also #stop? and #status.
3012 */
3013
3014 static VALUE
rb_thread_alive_p(VALUE thread)3015 rb_thread_alive_p(VALUE thread)
3016 {
3017 if (rb_threadptr_dead(rb_thread_ptr(thread))) {
3018 return Qfalse;
3019 }
3020 else {
3021 return Qtrue;
3022 }
3023 }
3024
3025 /*
3026 * call-seq:
3027 * thr.stop? -> true or false
3028 *
3029 * Returns +true+ if +thr+ is dead or sleeping.
3030 *
3031 * a = Thread.new { Thread.stop }
3032 * b = Thread.current
3033 * a.stop? #=> true
3034 * b.stop? #=> false
3035 *
3036 * See also #alive? and #status.
3037 */
3038
3039 static VALUE
rb_thread_stop_p(VALUE thread)3040 rb_thread_stop_p(VALUE thread)
3041 {
3042 rb_thread_t *th = rb_thread_ptr(thread);
3043
3044 if (rb_threadptr_dead(th)) {
3045 return Qtrue;
3046 }
3047 else if (th->status == THREAD_STOPPED ||
3048 th->status == THREAD_STOPPED_FOREVER) {
3049 return Qtrue;
3050 }
3051 else {
3052 return Qfalse;
3053 }
3054 }
3055
3056 /*
3057 * call-seq:
3058 * thr.safe_level -> integer
3059 *
3060 * Returns the safe level.
3061 *
3062 * This method is obsolete because $SAFE is a process global state.
3063 * Simply check $SAFE.
3064 */
3065
3066 static VALUE
rb_thread_safe_level(VALUE thread)3067 rb_thread_safe_level(VALUE thread)
3068 {
3069 return UINT2NUM(rb_safe_level());
3070 }
3071
3072 /*
3073 * call-seq:
3074 * thr.name -> string
3075 *
3076 * show the name of the thread.
3077 */
3078
3079 static VALUE
rb_thread_getname(VALUE thread)3080 rb_thread_getname(VALUE thread)
3081 {
3082 return rb_thread_ptr(thread)->name;
3083 }
3084
3085 /*
3086 * call-seq:
3087 * thr.name=(name) -> string
3088 *
3089 * set given name to the ruby thread.
3090 * On some platform, it may set the name to pthread and/or kernel.
3091 */
3092
3093 static VALUE
rb_thread_setname(VALUE thread,VALUE name)3094 rb_thread_setname(VALUE thread, VALUE name)
3095 {
3096 rb_thread_t *target_th = rb_thread_ptr(thread);
3097
3098 if (!NIL_P(name)) {
3099 rb_encoding *enc;
3100 StringValueCStr(name);
3101 enc = rb_enc_get(name);
3102 if (!rb_enc_asciicompat(enc)) {
3103 rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
3104 rb_enc_name(enc));
3105 }
3106 name = rb_str_new_frozen(name);
3107 }
3108 target_th->name = name;
3109 if (threadptr_initialized(target_th)) {
3110 native_set_another_thread_name(target_th->thread_id, name);
3111 }
3112 return name;
3113 }
3114
3115 /*
3116 * call-seq:
3117 * thr.to_s -> string
3118 *
3119 * Dump the name, id, and status of _thr_ to a string.
3120 */
3121
3122 static VALUE
rb_thread_to_s(VALUE thread)3123 rb_thread_to_s(VALUE thread)
3124 {
3125 VALUE cname = rb_class_path(rb_obj_class(thread));
3126 rb_thread_t *target_th = rb_thread_ptr(thread);
3127 const char *status;
3128 VALUE str, loc;
3129
3130 status = thread_status_name(target_th, TRUE);
3131 str = rb_sprintf("#<%"PRIsVALUE":%p", cname, (void *)thread);
3132 if (!NIL_P(target_th->name)) {
3133 rb_str_catf(str, "@%"PRIsVALUE, target_th->name);
3134 }
3135 if ((loc = threadptr_invoke_proc_location(target_th)) != Qnil) {
3136 rb_str_catf(str, "@%"PRIsVALUE":%"PRIsVALUE,
3137 RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
3138 rb_gc_force_recycle(loc);
3139 }
3140 rb_str_catf(str, " %s>", status);
3141 OBJ_INFECT(str, thread);
3142
3143 return str;
3144 }
3145
3146 /* variables for recursive traversals */
3147 static ID recursive_key;
3148
3149 static VALUE
threadptr_local_aref(rb_thread_t * th,ID id)3150 threadptr_local_aref(rb_thread_t *th, ID id)
3151 {
3152 if (id == recursive_key) {
3153 return th->ec->local_storage_recursive_hash;
3154 }
3155 else {
3156 st_data_t val;
3157 st_table *local_storage = th->ec->local_storage;
3158
3159 if (local_storage != NULL && st_lookup(local_storage, id, &val)) {
3160 return (VALUE)val;
3161 }
3162 else {
3163 return Qnil;
3164 }
3165 }
3166 }
3167
3168 VALUE
rb_thread_local_aref(VALUE thread,ID id)3169 rb_thread_local_aref(VALUE thread, ID id)
3170 {
3171 return threadptr_local_aref(rb_thread_ptr(thread), id);
3172 }
3173
3174 /*
3175 * call-seq:
3176 * thr[sym] -> obj or nil
3177 *
3178 * Attribute Reference---Returns the value of a fiber-local variable (current thread's root fiber
3179 * if not explicitly inside a Fiber), using either a symbol or a string name.
3180 * If the specified variable does not exist, returns +nil+.
3181 *
3182 * [
3183 * Thread.new { Thread.current["name"] = "A" },
3184 * Thread.new { Thread.current[:name] = "B" },
3185 * Thread.new { Thread.current["name"] = "C" }
3186 * ].each do |th|
3187 * th.join
3188 * puts "#{th.inspect}: #{th[:name]}"
3189 * end
3190 *
3191 * This will produce:
3192 *
3193 * #<Thread:0x00000002a54220 dead>: A
3194 * #<Thread:0x00000002a541a8 dead>: B
3195 * #<Thread:0x00000002a54130 dead>: C
3196 *
3197 * Thread#[] and Thread#[]= are not thread-local but fiber-local.
3198 * This confusion did not exist in Ruby 1.8 because
3199 * fibers are only available since Ruby 1.9.
3200 * Ruby 1.9 chooses that the methods behaves fiber-local to save
3201 * following idiom for dynamic scope.
3202 *
3203 * def meth(newvalue)
3204 * begin
3205 * oldvalue = Thread.current[:name]
3206 * Thread.current[:name] = newvalue
3207 * yield
3208 * ensure
3209 * Thread.current[:name] = oldvalue
3210 * end
3211 * end
3212 *
3213 * The idiom may not work as dynamic scope if the methods are thread-local
3214 * and a given block switches fiber.
3215 *
3216 * f = Fiber.new {
3217 * meth(1) {
3218 * Fiber.yield
3219 * }
3220 * }
3221 * meth(2) {
3222 * f.resume
3223 * }
3224 * f.resume
3225 * p Thread.current[:name]
3226 * #=> nil if fiber-local
3227 * #=> 2 if thread-local (The value 2 is leaked to outside of meth method.)
3228 *
3229 * For thread-local variables, please see #thread_variable_get and
3230 * #thread_variable_set.
3231 *
3232 */
3233
3234 static VALUE
rb_thread_aref(VALUE thread,VALUE key)3235 rb_thread_aref(VALUE thread, VALUE key)
3236 {
3237 ID id = rb_check_id(&key);
3238 if (!id) return Qnil;
3239 return rb_thread_local_aref(thread, id);
3240 }
3241
3242 /*
3243 * call-seq:
3244 * thr.fetch(sym) -> obj
3245 * thr.fetch(sym) { } -> obj
3246 * thr.fetch(sym, default) -> obj
3247 *
3248 * Returns a fiber-local for the given key. If the key can't be
3249 * found, there are several options: With no other arguments, it will
3250 * raise a <code>KeyError</code> exception; if <i>default</i> is
3251 * given, then that will be returned; if the optional code block is
3252 * specified, then that will be run and its result returned.
3253 * See Thread#[] and Hash#fetch.
3254 */
3255 static VALUE
rb_thread_fetch(int argc,VALUE * argv,VALUE self)3256 rb_thread_fetch(int argc, VALUE *argv, VALUE self)
3257 {
3258 VALUE key, val;
3259 ID id;
3260 rb_thread_t *target_th = rb_thread_ptr(self);
3261 int block_given;
3262
3263 rb_check_arity(argc, 1, 2);
3264 key = argv[0];
3265
3266 block_given = rb_block_given_p();
3267 if (block_given && argc == 2) {
3268 rb_warn("block supersedes default value argument");
3269 }
3270
3271 id = rb_check_id(&key);
3272
3273 if (id == recursive_key) {
3274 return target_th->ec->local_storage_recursive_hash;
3275 }
3276 else if (id && target_th->ec->local_storage &&
3277 st_lookup(target_th->ec->local_storage, id, &val)) {
3278 return val;
3279 }
3280 else if (block_given) {
3281 return rb_yield(key);
3282 }
3283 else if (argc == 1) {
3284 rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key);
3285 }
3286 else {
3287 return argv[1];
3288 }
3289 }
3290
3291 static VALUE
threadptr_local_aset(rb_thread_t * th,ID id,VALUE val)3292 threadptr_local_aset(rb_thread_t *th, ID id, VALUE val)
3293 {
3294 if (id == recursive_key) {
3295 th->ec->local_storage_recursive_hash = val;
3296 return val;
3297 }
3298 else {
3299 st_table *local_storage = th->ec->local_storage;
3300
3301 if (NIL_P(val)) {
3302 if (!local_storage) return Qnil;
3303 st_delete_wrap(local_storage, id);
3304 return Qnil;
3305 }
3306 else {
3307 if (local_storage == NULL) {
3308 th->ec->local_storage = local_storage = st_init_numtable();
3309 }
3310 st_insert(local_storage, id, val);
3311 return val;
3312 }
3313 }
3314 }
3315
3316 VALUE
rb_thread_local_aset(VALUE thread,ID id,VALUE val)3317 rb_thread_local_aset(VALUE thread, ID id, VALUE val)
3318 {
3319 if (OBJ_FROZEN(thread)) {
3320 rb_error_frozen("thread locals");
3321 }
3322
3323 return threadptr_local_aset(rb_thread_ptr(thread), id, val);
3324 }
3325
3326 /*
3327 * call-seq:
3328 * thr[sym] = obj -> obj
3329 *
3330 * Attribute Assignment---Sets or creates the value of a fiber-local variable,
3331 * using either a symbol or a string.
3332 *
3333 * See also Thread#[].
3334 *
3335 * For thread-local variables, please see #thread_variable_set and
3336 * #thread_variable_get.
3337 */
3338
3339 static VALUE
rb_thread_aset(VALUE self,VALUE id,VALUE val)3340 rb_thread_aset(VALUE self, VALUE id, VALUE val)
3341 {
3342 return rb_thread_local_aset(self, rb_to_id(id), val);
3343 }
3344
3345 /*
3346 * call-seq:
3347 * thr.thread_variable_get(key) -> obj or nil
3348 *
3349 * Returns the value of a thread local variable that has been set. Note that
3350 * these are different than fiber local values. For fiber local values,
3351 * please see Thread#[] and Thread#[]=.
3352 *
3353 * Thread local values are carried along with threads, and do not respect
3354 * fibers. For example:
3355 *
3356 * Thread.new {
3357 * Thread.current.thread_variable_set("foo", "bar") # set a thread local
3358 * Thread.current["foo"] = "bar" # set a fiber local
3359 *
3360 * Fiber.new {
3361 * Fiber.yield [
3362 * Thread.current.thread_variable_get("foo"), # get the thread local
3363 * Thread.current["foo"], # get the fiber local
3364 * ]
3365 * }.resume
3366 * }.join.value # => ['bar', nil]
3367 *
3368 * The value "bar" is returned for the thread local, where nil is returned
3369 * for the fiber local. The fiber is executed in the same thread, so the
3370 * thread local values are available.
3371 */
3372
3373 static VALUE
rb_thread_variable_get(VALUE thread,VALUE key)3374 rb_thread_variable_get(VALUE thread, VALUE key)
3375 {
3376 VALUE locals;
3377
3378 locals = rb_ivar_get(thread, id_locals);
3379 return rb_hash_aref(locals, rb_to_symbol(key));
3380 }
3381
3382 /*
3383 * call-seq:
3384 * thr.thread_variable_set(key, value)
3385 *
3386 * Sets a thread local with +key+ to +value+. Note that these are local to
3387 * threads, and not to fibers. Please see Thread#thread_variable_get and
3388 * Thread#[] for more information.
3389 */
3390
3391 static VALUE
rb_thread_variable_set(VALUE thread,VALUE id,VALUE val)3392 rb_thread_variable_set(VALUE thread, VALUE id, VALUE val)
3393 {
3394 VALUE locals;
3395
3396 if (OBJ_FROZEN(thread)) {
3397 rb_error_frozen("thread locals");
3398 }
3399
3400 locals = rb_ivar_get(thread, id_locals);
3401 return rb_hash_aset(locals, rb_to_symbol(id), val);
3402 }
3403
3404 /*
3405 * call-seq:
3406 * thr.key?(sym) -> true or false
3407 *
3408 * Returns +true+ if the given string (or symbol) exists as a fiber-local
3409 * variable.
3410 *
3411 * me = Thread.current
3412 * me[:oliver] = "a"
3413 * me.key?(:oliver) #=> true
3414 * me.key?(:stanley) #=> false
3415 */
3416
3417 static VALUE
rb_thread_key_p(VALUE self,VALUE key)3418 rb_thread_key_p(VALUE self, VALUE key)
3419 {
3420 ID id = rb_check_id(&key);
3421 st_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
3422
3423 if (!id || local_storage == NULL) {
3424 return Qfalse;
3425 }
3426 else if (st_lookup(local_storage, id, 0)) {
3427 return Qtrue;
3428 }
3429 else {
3430 return Qfalse;
3431 }
3432 }
3433
3434 static int
thread_keys_i(ID key,VALUE value,VALUE ary)3435 thread_keys_i(ID key, VALUE value, VALUE ary)
3436 {
3437 rb_ary_push(ary, ID2SYM(key));
3438 return ST_CONTINUE;
3439 }
3440
3441 int
rb_thread_alone(void)3442 rb_thread_alone(void)
3443 {
3444 return vm_living_thread_num(GET_VM()) == 1;
3445 }
3446
3447 /*
3448 * call-seq:
3449 * thr.keys -> array
3450 *
3451 * Returns an array of the names of the fiber-local variables (as Symbols).
3452 *
3453 * thr = Thread.new do
3454 * Thread.current[:cat] = 'meow'
3455 * Thread.current["dog"] = 'woof'
3456 * end
3457 * thr.join #=> #<Thread:0x401b3f10 dead>
3458 * thr.keys #=> [:dog, :cat]
3459 */
3460
3461 static VALUE
rb_thread_keys(VALUE self)3462 rb_thread_keys(VALUE self)
3463 {
3464 st_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
3465 VALUE ary = rb_ary_new();
3466
3467 if (local_storage) {
3468 st_foreach(local_storage, thread_keys_i, ary);
3469 }
3470 return ary;
3471 }
3472
3473 static int
keys_i(VALUE key,VALUE value,VALUE ary)3474 keys_i(VALUE key, VALUE value, VALUE ary)
3475 {
3476 rb_ary_push(ary, key);
3477 return ST_CONTINUE;
3478 }
3479
3480 /*
3481 * call-seq:
3482 * thr.thread_variables -> array
3483 *
3484 * Returns an array of the names of the thread-local variables (as Symbols).
3485 *
3486 * thr = Thread.new do
3487 * Thread.current.thread_variable_set(:cat, 'meow')
3488 * Thread.current.thread_variable_set("dog", 'woof')
3489 * end
3490 * thr.join #=> #<Thread:0x401b3f10 dead>
3491 * thr.thread_variables #=> [:dog, :cat]
3492 *
3493 * Note that these are not fiber local variables. Please see Thread#[] and
3494 * Thread#thread_variable_get for more details.
3495 */
3496
3497 static VALUE
rb_thread_variables(VALUE thread)3498 rb_thread_variables(VALUE thread)
3499 {
3500 VALUE locals;
3501 VALUE ary;
3502
3503 locals = rb_ivar_get(thread, id_locals);
3504 ary = rb_ary_new();
3505 rb_hash_foreach(locals, keys_i, ary);
3506
3507 return ary;
3508 }
3509
3510 /*
3511 * call-seq:
3512 * thr.thread_variable?(key) -> true or false
3513 *
3514 * Returns +true+ if the given string (or symbol) exists as a thread-local
3515 * variable.
3516 *
3517 * me = Thread.current
3518 * me.thread_variable_set(:oliver, "a")
3519 * me.thread_variable?(:oliver) #=> true
3520 * me.thread_variable?(:stanley) #=> false
3521 *
3522 * Note that these are not fiber local variables. Please see Thread#[] and
3523 * Thread#thread_variable_get for more details.
3524 */
3525
3526 static VALUE
rb_thread_variable_p(VALUE thread,VALUE key)3527 rb_thread_variable_p(VALUE thread, VALUE key)
3528 {
3529 VALUE locals;
3530 ID id = rb_check_id(&key);
3531
3532 if (!id) return Qfalse;
3533
3534 locals = rb_ivar_get(thread, id_locals);
3535
3536 if (rb_hash_lookup(locals, ID2SYM(id)) != Qnil) {
3537 return Qtrue;
3538 }
3539 else {
3540 return Qfalse;
3541 }
3542
3543 return Qfalse;
3544 }
3545
3546 /*
3547 * call-seq:
3548 * thr.priority -> integer
3549 *
3550 * Returns the priority of <i>thr</i>. Default is inherited from the
3551 * current thread which creating the new thread, or zero for the
3552 * initial main thread; higher-priority thread will run more frequently
3553 * than lower-priority threads (but lower-priority threads can also run).
3554 *
3555 * This is just hint for Ruby thread scheduler. It may be ignored on some
3556 * platform.
3557 *
3558 * Thread.current.priority #=> 0
3559 */
3560
3561 static VALUE
rb_thread_priority(VALUE thread)3562 rb_thread_priority(VALUE thread)
3563 {
3564 return INT2NUM(rb_thread_ptr(thread)->priority);
3565 }
3566
3567
3568 /*
3569 * call-seq:
3570 * thr.priority= integer -> thr
3571 *
3572 * Sets the priority of <i>thr</i> to <i>integer</i>. Higher-priority threads
3573 * will run more frequently than lower-priority threads (but lower-priority
3574 * threads can also run).
3575 *
3576 * This is just hint for Ruby thread scheduler. It may be ignored on some
3577 * platform.
3578 *
3579 * count1 = count2 = 0
3580 * a = Thread.new do
3581 * loop { count1 += 1 }
3582 * end
3583 * a.priority = -1
3584 *
3585 * b = Thread.new do
3586 * loop { count2 += 1 }
3587 * end
3588 * b.priority = -2
3589 * sleep 1 #=> 1
3590 * count1 #=> 622504
3591 * count2 #=> 5832
3592 */
3593
3594 static VALUE
rb_thread_priority_set(VALUE thread,VALUE prio)3595 rb_thread_priority_set(VALUE thread, VALUE prio)
3596 {
3597 rb_thread_t *target_th = rb_thread_ptr(thread);
3598 int priority;
3599
3600 #if USE_NATIVE_THREAD_PRIORITY
3601 target_th->priority = NUM2INT(prio);
3602 native_thread_apply_priority(th);
3603 #else
3604 priority = NUM2INT(prio);
3605 if (priority > RUBY_THREAD_PRIORITY_MAX) {
3606 priority = RUBY_THREAD_PRIORITY_MAX;
3607 }
3608 else if (priority < RUBY_THREAD_PRIORITY_MIN) {
3609 priority = RUBY_THREAD_PRIORITY_MIN;
3610 }
3611 target_th->priority = (int8_t)priority;
3612 #endif
3613 return INT2NUM(target_th->priority);
3614 }
3615
3616 /* for IO */
3617
3618 #if defined(NFDBITS) && defined(HAVE_RB_FD_INIT)
3619
3620 /*
3621 * several Unix platforms support file descriptors bigger than FD_SETSIZE
3622 * in select(2) system call.
3623 *
3624 * - Linux 2.2.12 (?)
3625 * - NetBSD 1.2 (src/sys/kern/sys_generic.c:1.25)
3626 * select(2) documents how to allocate fd_set dynamically.
3627 * http://netbsd.gw.com/cgi-bin/man-cgi?select++NetBSD-4.0
3628 * - FreeBSD 2.2 (src/sys/kern/sys_generic.c:1.19)
3629 * - OpenBSD 2.0 (src/sys/kern/sys_generic.c:1.4)
3630 * select(2) documents how to allocate fd_set dynamically.
3631 * http://www.openbsd.org/cgi-bin/man.cgi?query=select&manpath=OpenBSD+4.4
3632 * - HP-UX documents how to allocate fd_set dynamically.
3633 * http://docs.hp.com/en/B2355-60105/select.2.html
3634 * - Solaris 8 has select_large_fdset
3635 * - Mac OS X 10.7 (Lion)
3636 * select(2) returns EINVAL if nfds is greater than FD_SET_SIZE and
3637 * _DARWIN_UNLIMITED_SELECT (or _DARWIN_C_SOURCE) isn't defined.
3638 * http://developer.apple.com/library/mac/#releasenotes/Darwin/SymbolVariantsRelNotes/_index.html
3639 *
3640 * When fd_set is not big enough to hold big file descriptors,
3641 * it should be allocated dynamically.
3642 * Note that this assumes fd_set is structured as bitmap.
3643 *
3644 * rb_fd_init allocates the memory.
3645 * rb_fd_term free the memory.
3646 * rb_fd_set may re-allocates bitmap.
3647 *
3648 * So rb_fd_set doesn't reject file descriptors bigger than FD_SETSIZE.
3649 */
3650
3651 void
rb_fd_init(rb_fdset_t * fds)3652 rb_fd_init(rb_fdset_t *fds)
3653 {
3654 fds->maxfd = 0;
3655 fds->fdset = ALLOC(fd_set);
3656 FD_ZERO(fds->fdset);
3657 }
3658
3659 void
rb_fd_init_copy(rb_fdset_t * dst,rb_fdset_t * src)3660 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
3661 {
3662 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
3663
3664 if (size < sizeof(fd_set))
3665 size = sizeof(fd_set);
3666 dst->maxfd = src->maxfd;
3667 dst->fdset = xmalloc(size);
3668 memcpy(dst->fdset, src->fdset, size);
3669 }
3670
3671 void
rb_fd_term(rb_fdset_t * fds)3672 rb_fd_term(rb_fdset_t *fds)
3673 {
3674 if (fds->fdset) xfree(fds->fdset);
3675 fds->maxfd = 0;
3676 fds->fdset = 0;
3677 }
3678
3679 void
rb_fd_zero(rb_fdset_t * fds)3680 rb_fd_zero(rb_fdset_t *fds)
3681 {
3682 if (fds->fdset)
3683 MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS));
3684 }
3685
3686 static void
rb_fd_resize(int n,rb_fdset_t * fds)3687 rb_fd_resize(int n, rb_fdset_t *fds)
3688 {
3689 size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask);
3690 size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask);
3691
3692 if (m < sizeof(fd_set)) m = sizeof(fd_set);
3693 if (o < sizeof(fd_set)) o = sizeof(fd_set);
3694
3695 if (m > o) {
3696 fds->fdset = xrealloc(fds->fdset, m);
3697 memset((char *)fds->fdset + o, 0, m - o);
3698 }
3699 if (n >= fds->maxfd) fds->maxfd = n + 1;
3700 }
3701
3702 void
rb_fd_set(int n,rb_fdset_t * fds)3703 rb_fd_set(int n, rb_fdset_t *fds)
3704 {
3705 rb_fd_resize(n, fds);
3706 FD_SET(n, fds->fdset);
3707 }
3708
3709 void
rb_fd_clr(int n,rb_fdset_t * fds)3710 rb_fd_clr(int n, rb_fdset_t *fds)
3711 {
3712 if (n >= fds->maxfd) return;
3713 FD_CLR(n, fds->fdset);
3714 }
3715
3716 int
rb_fd_isset(int n,const rb_fdset_t * fds)3717 rb_fd_isset(int n, const rb_fdset_t *fds)
3718 {
3719 if (n >= fds->maxfd) return 0;
3720 return FD_ISSET(n, fds->fdset) != 0; /* "!= 0" avoids FreeBSD PR 91421 */
3721 }
3722
3723 void
rb_fd_copy(rb_fdset_t * dst,const fd_set * src,int max)3724 rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max)
3725 {
3726 size_t size = howmany(max, NFDBITS) * sizeof(fd_mask);
3727
3728 if (size < sizeof(fd_set)) size = sizeof(fd_set);
3729 dst->maxfd = max;
3730 dst->fdset = xrealloc(dst->fdset, size);
3731 memcpy(dst->fdset, src, size);
3732 }
3733
3734 void
rb_fd_dup(rb_fdset_t * dst,const rb_fdset_t * src)3735 rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src)
3736 {
3737 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
3738
3739 if (size < sizeof(fd_set))
3740 size = sizeof(fd_set);
3741 dst->maxfd = src->maxfd;
3742 dst->fdset = xrealloc(dst->fdset, size);
3743 memcpy(dst->fdset, src->fdset, size);
3744 }
3745
3746 int
rb_fd_select(int n,rb_fdset_t * readfds,rb_fdset_t * writefds,rb_fdset_t * exceptfds,struct timeval * timeout)3747 rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout)
3748 {
3749 fd_set *r = NULL, *w = NULL, *e = NULL;
3750 if (readfds) {
3751 rb_fd_resize(n - 1, readfds);
3752 r = rb_fd_ptr(readfds);
3753 }
3754 if (writefds) {
3755 rb_fd_resize(n - 1, writefds);
3756 w = rb_fd_ptr(writefds);
3757 }
3758 if (exceptfds) {
3759 rb_fd_resize(n - 1, exceptfds);
3760 e = rb_fd_ptr(exceptfds);
3761 }
3762 return select(n, r, w, e, timeout);
3763 }
3764
3765 #define rb_fd_no_init(fds) ((void)((fds)->fdset = 0), (void)((fds)->maxfd = 0))
3766
3767 #undef FD_ZERO
3768 #undef FD_SET
3769 #undef FD_CLR
3770 #undef FD_ISSET
3771
3772 #define FD_ZERO(f) rb_fd_zero(f)
3773 #define FD_SET(i, f) rb_fd_set((i), (f))
3774 #define FD_CLR(i, f) rb_fd_clr((i), (f))
3775 #define FD_ISSET(i, f) rb_fd_isset((i), (f))
3776
3777 #elif defined(_WIN32)
3778
3779 void
rb_fd_init(rb_fdset_t * set)3780 rb_fd_init(rb_fdset_t *set)
3781 {
3782 set->capa = FD_SETSIZE;
3783 set->fdset = ALLOC(fd_set);
3784 FD_ZERO(set->fdset);
3785 }
3786
3787 void
rb_fd_init_copy(rb_fdset_t * dst,rb_fdset_t * src)3788 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
3789 {
3790 rb_fd_init(dst);
3791 rb_fd_dup(dst, src);
3792 }
3793
3794 void
rb_fd_term(rb_fdset_t * set)3795 rb_fd_term(rb_fdset_t *set)
3796 {
3797 xfree(set->fdset);
3798 set->fdset = NULL;
3799 set->capa = 0;
3800 }
3801
3802 void
rb_fd_set(int fd,rb_fdset_t * set)3803 rb_fd_set(int fd, rb_fdset_t *set)
3804 {
3805 unsigned int i;
3806 SOCKET s = rb_w32_get_osfhandle(fd);
3807
3808 for (i = 0; i < set->fdset->fd_count; i++) {
3809 if (set->fdset->fd_array[i] == s) {
3810 return;
3811 }
3812 }
3813 if (set->fdset->fd_count >= (unsigned)set->capa) {
3814 set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE;
3815 set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa);
3816 }
3817 set->fdset->fd_array[set->fdset->fd_count++] = s;
3818 }
3819
3820 #undef FD_ZERO
3821 #undef FD_SET
3822 #undef FD_CLR
3823 #undef FD_ISSET
3824
3825 #define FD_ZERO(f) rb_fd_zero(f)
3826 #define FD_SET(i, f) rb_fd_set((i), (f))
3827 #define FD_CLR(i, f) rb_fd_clr((i), (f))
3828 #define FD_ISSET(i, f) rb_fd_isset((i), (f))
3829
3830 #define rb_fd_no_init(fds) (void)((fds)->fdset = 0)
3831
3832 #endif
3833
3834 #ifndef rb_fd_no_init
3835 #define rb_fd_no_init(fds) (void)(fds)
3836 #endif
3837
3838 static int
wait_retryable(int * result,int errnum,rb_hrtime_t * rel,rb_hrtime_t end)3839 wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
3840 {
3841 if (*result < 0) {
3842 switch (errnum) {
3843 case EINTR:
3844 #ifdef ERESTART
3845 case ERESTART:
3846 #endif
3847 *result = 0;
3848 if (rel && hrtime_update_expire(rel, end)) {
3849 *rel = 0;
3850 }
3851 return TRUE;
3852 }
3853 return FALSE;
3854 }
3855 else if (*result == 0) {
3856 /* check for spurious wakeup */
3857 if (rel) {
3858 return !hrtime_update_expire(rel, end);
3859 }
3860 return TRUE;
3861 }
3862 return FALSE;
3863 }
3864
3865 struct select_set {
3866 int max;
3867 int sigwait_fd;
3868 rb_thread_t *th;
3869 rb_fdset_t *rset;
3870 rb_fdset_t *wset;
3871 rb_fdset_t *eset;
3872 rb_fdset_t orig_rset;
3873 rb_fdset_t orig_wset;
3874 rb_fdset_t orig_eset;
3875 struct timeval *timeout;
3876 };
3877
3878 static VALUE
select_set_free(VALUE p)3879 select_set_free(VALUE p)
3880 {
3881 struct select_set *set = (struct select_set *)p;
3882
3883 if (set->sigwait_fd >= 0) {
3884 rb_sigwait_fd_put(set->th, set->sigwait_fd);
3885 rb_sigwait_fd_migrate(set->th->vm);
3886 }
3887
3888 rb_fd_term(&set->orig_rset);
3889 rb_fd_term(&set->orig_wset);
3890 rb_fd_term(&set->orig_eset);
3891
3892 return Qfalse;
3893 }
3894
3895 static const rb_hrtime_t *
sigwait_timeout(rb_thread_t * th,int sigwait_fd,const rb_hrtime_t * orig,int * drained_p)3896 sigwait_timeout(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *orig,
3897 int *drained_p)
3898 {
3899 static const rb_hrtime_t quantum = TIME_QUANTUM_USEC * 1000;
3900
3901 if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) {
3902 *drained_p = check_signals_nogvl(th, sigwait_fd);
3903 if (!orig || *orig > quantum)
3904 return &quantum;
3905 }
3906
3907 return orig;
3908 }
3909
3910 static VALUE
do_select(VALUE p)3911 do_select(VALUE p)
3912 {
3913 struct select_set *set = (struct select_set *)p;
3914 int result = 0;
3915 int lerrno;
3916 rb_hrtime_t *to, rel, end = 0;
3917
3918 timeout_prepare(&to, &rel, &end, set->timeout);
3919 #define restore_fdset(dst, src) \
3920 ((dst) ? rb_fd_dup(dst, src) : (void)0)
3921 #define do_select_update() \
3922 (restore_fdset(set->rset, &set->orig_rset), \
3923 restore_fdset(set->wset, &set->orig_wset), \
3924 restore_fdset(set->eset, &set->orig_eset), \
3925 TRUE)
3926
3927 do {
3928 int drained;
3929 lerrno = 0;
3930
3931 BLOCKING_REGION(set->th, {
3932 const rb_hrtime_t *sto;
3933 struct timeval tv;
3934
3935 sto = sigwait_timeout(set->th, set->sigwait_fd, to, &drained);
3936 if (!RUBY_VM_INTERRUPTED(set->th->ec)) {
3937 result = native_fd_select(set->max, set->rset, set->wset,
3938 set->eset,
3939 rb_hrtime2timeval(&tv, sto), set->th);
3940 if (result < 0) lerrno = errno;
3941 }
3942 }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, TRUE);
3943
3944 if (set->sigwait_fd >= 0) {
3945 if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset))
3946 result--;
3947 (void)check_signals_nogvl(set->th, set->sigwait_fd);
3948 }
3949
3950 RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
3951 } while (wait_retryable(&result, lerrno, to, end) && do_select_update());
3952
3953 if (result < 0) {
3954 errno = lerrno;
3955 }
3956
3957 return (VALUE)result;
3958 }
3959
3960 static void
rb_thread_wait_fd_rw(int fd,int read)3961 rb_thread_wait_fd_rw(int fd, int read)
3962 {
3963 int result = 0;
3964 int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT;
3965
3966 thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write");
3967
3968 if (fd < 0) {
3969 rb_raise(rb_eIOError, "closed stream");
3970 }
3971
3972 result = rb_wait_for_single_fd(fd, events, NULL);
3973 if (result < 0) {
3974 rb_sys_fail(0);
3975 }
3976
3977 thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write");
3978 }
3979
3980 void
rb_thread_wait_fd(int fd)3981 rb_thread_wait_fd(int fd)
3982 {
3983 rb_thread_wait_fd_rw(fd, 1);
3984 }
3985
3986 int
rb_thread_fd_writable(int fd)3987 rb_thread_fd_writable(int fd)
3988 {
3989 rb_thread_wait_fd_rw(fd, 0);
3990 return TRUE;
3991 }
3992
3993 static rb_fdset_t *
init_set_fd(int fd,rb_fdset_t * fds)3994 init_set_fd(int fd, rb_fdset_t *fds)
3995 {
3996 if (fd < 0) {
3997 return 0;
3998 }
3999 rb_fd_init(fds);
4000 rb_fd_set(fd, fds);
4001
4002 return fds;
4003 }
4004
4005 int
rb_thread_fd_select(int max,rb_fdset_t * read,rb_fdset_t * write,rb_fdset_t * except,struct timeval * timeout)4006 rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
4007 struct timeval *timeout)
4008 {
4009 struct select_set set;
4010
4011 set.th = GET_THREAD();
4012 RUBY_VM_CHECK_INTS_BLOCKING(set.th->ec);
4013 set.max = max;
4014 set.rset = read;
4015 set.wset = write;
4016 set.eset = except;
4017 set.timeout = timeout;
4018
4019 if (!set.rset && !set.wset && !set.eset) {
4020 if (!timeout) {
4021 rb_thread_sleep_forever();
4022 return 0;
4023 }
4024 rb_thread_wait_for(*timeout);
4025 return 0;
4026 }
4027
4028 set.sigwait_fd = rb_sigwait_fd_get(set.th);
4029 if (set.sigwait_fd >= 0) {
4030 if (set.rset)
4031 rb_fd_set(set.sigwait_fd, set.rset);
4032 else
4033 set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset);
4034 if (set.sigwait_fd >= set.max) {
4035 set.max = set.sigwait_fd + 1;
4036 }
4037 }
4038 #define fd_init_copy(f) do { \
4039 if (set.f) { \
4040 rb_fd_resize(set.max - 1, set.f); \
4041 if (&set.orig_##f != set.f) { /* sigwait_fd */ \
4042 rb_fd_init_copy(&set.orig_##f, set.f); \
4043 } \
4044 } \
4045 else { \
4046 rb_fd_no_init(&set.orig_##f); \
4047 } \
4048 } while (0)
4049 fd_init_copy(rset);
4050 fd_init_copy(wset);
4051 fd_init_copy(eset);
4052 #undef fd_init_copy
4053
4054 return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
4055 }
4056
4057 #ifdef USE_POLL
4058
4059 /* The same with linux kernel. TODO: make platform independent definition. */
4060 #define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR)
4061 #define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)
4062 #define POLLEX_SET (POLLPRI)
4063
4064 #ifndef POLLERR_SET /* defined for FreeBSD for now */
4065 # define POLLERR_SET (0)
4066 #endif
4067
4068 /*
4069 * returns a mask of events
4070 */
4071 int
rb_wait_for_single_fd(int fd,int events,struct timeval * timeout)4072 rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
4073 {
4074 struct pollfd fds[2];
4075 int result = 0, lerrno;
4076 rb_hrtime_t *to, rel, end = 0;
4077 int drained;
4078 nfds_t nfds;
4079 rb_unblock_function_t *ubf;
4080 struct waiting_fd wfd;
4081 int state;
4082
4083 wfd.th = GET_THREAD();
4084 wfd.fd = fd;
4085 list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
4086 EC_PUSH_TAG(wfd.th->ec);
4087 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
4088 RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
4089 timeout_prepare(&to, &rel, &end, timeout);
4090 fds[0].fd = fd;
4091 fds[0].events = (short)events;
4092 fds[0].revents = 0;
4093 do {
4094 fds[1].fd = rb_sigwait_fd_get(wfd.th);
4095
4096 if (fds[1].fd >= 0) {
4097 fds[1].events = POLLIN;
4098 fds[1].revents = 0;
4099 nfds = 2;
4100 ubf = ubf_sigwait;
4101 }
4102 else {
4103 nfds = 1;
4104 ubf = ubf_select;
4105 }
4106
4107 lerrno = 0;
4108 BLOCKING_REGION(wfd.th, {
4109 const rb_hrtime_t *sto;
4110 struct timespec ts;
4111
4112 sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained);
4113 if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) {
4114 result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), 0);
4115 if (result < 0) lerrno = errno;
4116 }
4117 }, ubf, wfd.th, TRUE);
4118
4119 if (fds[1].fd >= 0) {
4120 if (result > 0 && fds[1].revents) {
4121 result--;
4122 }
4123 (void)check_signals_nogvl(wfd.th, fds[1].fd);
4124 rb_sigwait_fd_put(wfd.th, fds[1].fd);
4125 rb_sigwait_fd_migrate(wfd.th->vm);
4126 }
4127 RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
4128 } while (wait_retryable(&result, lerrno, to, end));
4129 }
4130 EC_POP_TAG();
4131 list_del(&wfd.wfd_node);
4132 if (state) {
4133 EC_JUMP_TAG(wfd.th->ec, state);
4134 }
4135
4136 if (result < 0) {
4137 errno = lerrno;
4138 return -1;
4139 }
4140
4141 if (fds[0].revents & POLLNVAL) {
4142 errno = EBADF;
4143 return -1;
4144 }
4145
4146 /*
4147 * POLLIN, POLLOUT have a different meanings from select(2)'s read/write bit.
4148 * Therefore we need to fix it up.
4149 */
4150 result = 0;
4151 if (fds[0].revents & POLLIN_SET)
4152 result |= RB_WAITFD_IN;
4153 if (fds[0].revents & POLLOUT_SET)
4154 result |= RB_WAITFD_OUT;
4155 if (fds[0].revents & POLLEX_SET)
4156 result |= RB_WAITFD_PRI;
4157
4158 /* all requested events are ready if there is an error */
4159 if (fds[0].revents & POLLERR_SET)
4160 result |= events;
4161
4162 return result;
4163 }
4164 #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */
4165 struct select_args {
4166 union {
4167 int fd;
4168 int error;
4169 } as;
4170 rb_fdset_t *read;
4171 rb_fdset_t *write;
4172 rb_fdset_t *except;
4173 struct waiting_fd wfd;
4174 struct timeval *tv;
4175 };
4176
4177 static VALUE
select_single(VALUE ptr)4178 select_single(VALUE ptr)
4179 {
4180 struct select_args *args = (struct select_args *)ptr;
4181 int r;
4182
4183 r = rb_thread_fd_select(args->as.fd + 1,
4184 args->read, args->write, args->except, args->tv);
4185 if (r == -1)
4186 args->as.error = errno;
4187 if (r > 0) {
4188 r = 0;
4189 if (args->read && rb_fd_isset(args->as.fd, args->read))
4190 r |= RB_WAITFD_IN;
4191 if (args->write && rb_fd_isset(args->as.fd, args->write))
4192 r |= RB_WAITFD_OUT;
4193 if (args->except && rb_fd_isset(args->as.fd, args->except))
4194 r |= RB_WAITFD_PRI;
4195 }
4196 return (VALUE)r;
4197 }
4198
4199 static VALUE
select_single_cleanup(VALUE ptr)4200 select_single_cleanup(VALUE ptr)
4201 {
4202 struct select_args *args = (struct select_args *)ptr;
4203
4204 list_del(&args->wfd.wfd_node);
4205 if (args->read) rb_fd_term(args->read);
4206 if (args->write) rb_fd_term(args->write);
4207 if (args->except) rb_fd_term(args->except);
4208
4209 return (VALUE)-1;
4210 }
4211
4212 int
rb_wait_for_single_fd(int fd,int events,struct timeval * tv)4213 rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
4214 {
4215 rb_fdset_t rfds, wfds, efds;
4216 struct select_args args;
4217 int r;
4218 VALUE ptr = (VALUE)&args;
4219
4220 args.as.fd = fd;
4221 args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
4222 args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
4223 args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
4224 args.tv = tv;
4225 args.wfd.fd = fd;
4226 args.wfd.th = GET_THREAD();
4227
4228 list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node);
4229 r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
4230 if (r == -1)
4231 errno = args.as.error;
4232
4233 return r;
4234 }
4235 #endif /* ! USE_POLL */
4236
4237 /*
4238 * for GC
4239 */
4240
4241 #ifdef USE_CONSERVATIVE_STACK_END
4242 void
rb_gc_set_stack_end(VALUE ** stack_end_p)4243 rb_gc_set_stack_end(VALUE **stack_end_p)
4244 {
4245 VALUE stack_end;
4246 *stack_end_p = &stack_end;
4247 }
4248 #endif
4249
4250 /*
4251 *
4252 */
4253
4254 void
rb_threadptr_check_signal(rb_thread_t * mth)4255 rb_threadptr_check_signal(rb_thread_t *mth)
4256 {
4257 /* mth must be main_thread */
4258 if (rb_signal_buff_size() > 0) {
4259 /* wakeup main thread */
4260 threadptr_trap_interrupt(mth);
4261 }
4262 }
4263
4264 static void
timer_thread_function(void)4265 timer_thread_function(void)
4266 {
4267 volatile rb_execution_context_t *ec;
4268
4269 /* for time slice */
4270 ec = ACCESS_ONCE(rb_execution_context_t *,
4271 ruby_current_execution_context_ptr);
4272 if (ec) RUBY_VM_SET_TIMER_INTERRUPT(ec);
4273 }
4274
4275 static void
async_bug_fd(const char * mesg,int errno_arg,int fd)4276 async_bug_fd(const char *mesg, int errno_arg, int fd)
4277 {
4278 char buff[64];
4279 size_t n = strlcpy(buff, mesg, sizeof(buff));
4280 if (n < sizeof(buff)-3) {
4281 ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
4282 }
4283 rb_async_bug_errno(buff, errno_arg);
4284 }
4285
4286 /* VM-dependent API is not available for this function */
4287 static int
consume_communication_pipe(int fd)4288 consume_communication_pipe(int fd)
4289 {
4290 #if USE_EVENTFD
4291 uint64_t buff[1];
4292 #else
4293 /* buffer can be shared because no one refers to them. */
4294 static char buff[1024];
4295 #endif
4296 ssize_t result;
4297 int ret = FALSE; /* for rb_sigwait_sleep */
4298
4299 /*
4300 * disarm UBF_TIMER before we read, because it can become
4301 * re-armed at any time via sighandler and the pipe will refill
4302 * We can disarm it because this thread is now processing signals
4303 * and we do not want unnecessary SIGVTALRM
4304 */
4305 ubf_timer_disarm();
4306
4307 while (1) {
4308 result = read(fd, buff, sizeof(buff));
4309 if (result > 0) {
4310 ret = TRUE;
4311 if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) {
4312 return ret;
4313 }
4314 }
4315 else if (result == 0) {
4316 return ret;
4317 }
4318 else if (result < 0) {
4319 int e = errno;
4320 switch (e) {
4321 case EINTR:
4322 continue; /* retry */
4323 case EAGAIN:
4324 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
4325 case EWOULDBLOCK:
4326 #endif
4327 return ret;
4328 default:
4329 async_bug_fd("consume_communication_pipe: read", e, fd);
4330 }
4331 }
4332 }
4333 }
4334
4335 static int
check_signals_nogvl(rb_thread_t * th,int sigwait_fd)4336 check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
4337 {
4338 rb_vm_t *vm = GET_VM(); /* th may be 0 */
4339 int ret = consume_communication_pipe(sigwait_fd);
4340 ubf_wakeup_all_threads();
4341 ruby_sigchld_handler(vm);
4342 if (rb_signal_buff_size()) {
4343 if (th == vm->main_thread)
4344 /* no need to lock + wakeup if already in main thread */
4345 RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
4346 else
4347 threadptr_trap_interrupt(vm->main_thread);
4348 ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */
4349 }
4350 return ret;
4351 }
4352
4353 void
rb_thread_stop_timer_thread(void)4354 rb_thread_stop_timer_thread(void)
4355 {
4356 if (TIMER_THREAD_CREATED_P() && native_stop_timer_thread()) {
4357 native_reset_timer_thread();
4358 }
4359 }
4360
4361 void
rb_thread_reset_timer_thread(void)4362 rb_thread_reset_timer_thread(void)
4363 {
4364 native_reset_timer_thread();
4365 }
4366
4367 void
rb_thread_start_timer_thread(void)4368 rb_thread_start_timer_thread(void)
4369 {
4370 system_working = 1;
4371 rb_thread_create_timer_thread();
4372 }
4373
4374 static int
clear_coverage_i(st_data_t key,st_data_t val,st_data_t dummy)4375 clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy)
4376 {
4377 int i;
4378 VALUE coverage = (VALUE)val;
4379 VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
4380 VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES);
4381
4382 if (lines) {
4383 if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) {
4384 rb_ary_clear(lines);
4385 }
4386 else {
4387 int i;
4388 for (i = 0; i < RARRAY_LEN(lines); i++) {
4389 if (RARRAY_AREF(lines, i) != Qnil)
4390 RARRAY_ASET(lines, i, INT2FIX(0));
4391 }
4392 }
4393 }
4394 if (branches) {
4395 VALUE counters = RARRAY_AREF(branches, 1);
4396 for (i = 0; i < RARRAY_LEN(counters); i++) {
4397 RARRAY_ASET(counters, i, INT2FIX(0));
4398 }
4399 }
4400
4401 return ST_CONTINUE;
4402 }
4403
4404 void
rb_clear_coverages(void)4405 rb_clear_coverages(void)
4406 {
4407 VALUE coverages = rb_get_coverages();
4408 if (RTEST(coverages)) {
4409 rb_hash_foreach(coverages, clear_coverage_i, 0);
4410 }
4411 }
4412
4413 #if defined(HAVE_WORKING_FORK)
4414 static void
rb_thread_atfork_internal(rb_thread_t * th,void (* atfork)(rb_thread_t *,const rb_thread_t *))4415 rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const rb_thread_t *))
4416 {
4417 rb_thread_t *i = 0;
4418 rb_vm_t *vm = th->vm;
4419 vm->main_thread = th;
4420
4421 gvl_atfork(th->vm);
4422 ubf_list_atfork();
4423
4424 list_for_each(&vm->living_threads, i, vmlt_node) {
4425 atfork(i, th);
4426 }
4427 rb_vm_living_threads_init(vm);
4428 rb_vm_living_threads_insert(vm, th);
4429
4430 /* may be held by MJIT threads in parent */
4431 rb_native_mutex_initialize(&vm->waitpid_lock);
4432 rb_native_mutex_initialize(&vm->workqueue_lock);
4433
4434 /* may be held by any thread in parent */
4435 rb_native_mutex_initialize(&th->interrupt_lock);
4436
4437 vm->fork_gen++;
4438
4439 vm->sleeper = 0;
4440 rb_clear_coverages();
4441 }
4442
4443 static void
terminate_atfork_i(rb_thread_t * th,const rb_thread_t * current_th)4444 terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th)
4445 {
4446 if (th != current_th) {
4447 rb_mutex_abandon_keeping_mutexes(th);
4448 rb_mutex_abandon_locking_mutex(th);
4449 thread_cleanup_func(th, TRUE);
4450 }
4451 }
4452
4453 void rb_fiber_atfork(rb_thread_t *);
4454 void
rb_thread_atfork(void)4455 rb_thread_atfork(void)
4456 {
4457 rb_thread_t *th = GET_THREAD();
4458 rb_thread_atfork_internal(th, terminate_atfork_i);
4459 th->join_list = NULL;
4460 rb_fiber_atfork(th);
4461
4462 /* We don't want reproduce CVE-2003-0900. */
4463 rb_reset_random_seed();
4464
4465 /* For child, starting MJIT worker thread in this place which is safer than immediately after `after_fork_ruby`. */
4466 mjit_child_after_fork();
4467 }
4468
4469 static void
terminate_atfork_before_exec_i(rb_thread_t * th,const rb_thread_t * current_th)4470 terminate_atfork_before_exec_i(rb_thread_t *th, const rb_thread_t *current_th)
4471 {
4472 if (th != current_th) {
4473 thread_cleanup_func_before_exec(th);
4474 }
4475 }
4476
4477 void
rb_thread_atfork_before_exec(void)4478 rb_thread_atfork_before_exec(void)
4479 {
4480 rb_thread_t *th = GET_THREAD();
4481 rb_thread_atfork_internal(th, terminate_atfork_before_exec_i);
4482 }
4483 #else
4484 void
rb_thread_atfork(void)4485 rb_thread_atfork(void)
4486 {
4487 }
4488
4489 void
rb_thread_atfork_before_exec(void)4490 rb_thread_atfork_before_exec(void)
4491 {
4492 }
4493 #endif
4494
4495 struct thgroup {
4496 int enclosed;
4497 VALUE group;
4498 };
4499
4500 static size_t
thgroup_memsize(const void * ptr)4501 thgroup_memsize(const void *ptr)
4502 {
4503 return sizeof(struct thgroup);
4504 }
4505
4506 static const rb_data_type_t thgroup_data_type = {
4507 "thgroup",
4508 {NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,},
4509 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
4510 };
4511
4512 /*
4513 * Document-class: ThreadGroup
4514 *
4515 * ThreadGroup provides a means of keeping track of a number of threads as a
4516 * group.
4517 *
4518 * A given Thread object can only belong to one ThreadGroup at a time; adding
4519 * a thread to a new group will remove it from any previous group.
4520 *
4521 * Newly created threads belong to the same group as the thread from which they
4522 * were created.
4523 */
4524
4525 /*
4526 * Document-const: Default
4527 *
4528 * The default ThreadGroup created when Ruby starts; all Threads belong to it
4529 * by default.
4530 */
4531 static VALUE
thgroup_s_alloc(VALUE klass)4532 thgroup_s_alloc(VALUE klass)
4533 {
4534 VALUE group;
4535 struct thgroup *data;
4536
4537 group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data);
4538 data->enclosed = 0;
4539 data->group = group;
4540
4541 return group;
4542 }
4543
4544 /*
4545 * call-seq:
4546 * thgrp.list -> array
4547 *
4548 * Returns an array of all existing Thread objects that belong to this group.
4549 *
4550 * ThreadGroup::Default.list #=> [#<Thread:0x401bdf4c run>]
4551 */
4552
4553 static VALUE
thgroup_list(VALUE group)4554 thgroup_list(VALUE group)
4555 {
4556 VALUE ary = rb_ary_new();
4557 rb_vm_t *vm = GET_THREAD()->vm;
4558 rb_thread_t *th = 0;
4559
4560 list_for_each(&vm->living_threads, th, vmlt_node) {
4561 if (th->thgroup == group) {
4562 rb_ary_push(ary, th->self);
4563 }
4564 }
4565 return ary;
4566 }
4567
4568
4569 /*
4570 * call-seq:
4571 * thgrp.enclose -> thgrp
4572 *
4573 * Prevents threads from being added to or removed from the receiving
4574 * ThreadGroup.
4575 *
4576 * New threads can still be started in an enclosed ThreadGroup.
4577 *
4578 * ThreadGroup::Default.enclose #=> #<ThreadGroup:0x4029d914>
4579 * thr = Thread.new { Thread.stop } #=> #<Thread:0x402a7210 sleep>
4580 * tg = ThreadGroup.new #=> #<ThreadGroup:0x402752d4>
4581 * tg.add thr
4582 * #=> ThreadError: can't move from the enclosed thread group
4583 */
4584
4585 static VALUE
thgroup_enclose(VALUE group)4586 thgroup_enclose(VALUE group)
4587 {
4588 struct thgroup *data;
4589
4590 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
4591 data->enclosed = 1;
4592
4593 return group;
4594 }
4595
4596
4597 /*
4598 * call-seq:
4599 * thgrp.enclosed? -> true or false
4600 *
4601 * Returns +true+ if the +thgrp+ is enclosed. See also ThreadGroup#enclose.
4602 */
4603
4604 static VALUE
thgroup_enclosed_p(VALUE group)4605 thgroup_enclosed_p(VALUE group)
4606 {
4607 struct thgroup *data;
4608
4609 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
4610 if (data->enclosed)
4611 return Qtrue;
4612 return Qfalse;
4613 }
4614
4615
4616 /*
4617 * call-seq:
4618 * thgrp.add(thread) -> thgrp
4619 *
4620 * Adds the given +thread+ to this group, removing it from any other
4621 * group to which it may have previously been a member.
4622 *
4623 * puts "Initial group is #{ThreadGroup::Default.list}"
4624 * tg = ThreadGroup.new
4625 * t1 = Thread.new { sleep }
4626 * t2 = Thread.new { sleep }
4627 * puts "t1 is #{t1}"
4628 * puts "t2 is #{t2}"
4629 * tg.add(t1)
4630 * puts "Initial group now #{ThreadGroup::Default.list}"
4631 * puts "tg group now #{tg.list}"
4632 *
4633 * This will produce:
4634 *
4635 * Initial group is #<Thread:0x401bdf4c>
4636 * t1 is #<Thread:0x401b3c90>
4637 * t2 is #<Thread:0x401b3c18>
4638 * Initial group now #<Thread:0x401b3c18>#<Thread:0x401bdf4c>
4639 * tg group now #<Thread:0x401b3c90>
4640 */
4641
4642 static VALUE
thgroup_add(VALUE group,VALUE thread)4643 thgroup_add(VALUE group, VALUE thread)
4644 {
4645 rb_thread_t *target_th = rb_thread_ptr(thread);
4646 struct thgroup *data;
4647
4648 if (OBJ_FROZEN(group)) {
4649 rb_raise(rb_eThreadError, "can't move to the frozen thread group");
4650 }
4651 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
4652 if (data->enclosed) {
4653 rb_raise(rb_eThreadError, "can't move to the enclosed thread group");
4654 }
4655
4656 if (!target_th->thgroup) {
4657 return Qnil;
4658 }
4659
4660 if (OBJ_FROZEN(target_th->thgroup)) {
4661 rb_raise(rb_eThreadError, "can't move from the frozen thread group");
4662 }
4663 TypedData_Get_Struct(target_th->thgroup, struct thgroup, &thgroup_data_type, data);
4664 if (data->enclosed) {
4665 rb_raise(rb_eThreadError,
4666 "can't move from the enclosed thread group");
4667 }
4668
4669 target_th->thgroup = group;
4670 return group;
4671 }
4672
4673 /*
4674 * Document-class: ThreadShield
4675 */
4676 static void
thread_shield_mark(void * ptr)4677 thread_shield_mark(void *ptr)
4678 {
4679 rb_gc_mark((VALUE)ptr);
4680 }
4681
4682 static const rb_data_type_t thread_shield_data_type = {
4683 "thread_shield",
4684 {thread_shield_mark, 0, 0,},
4685 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
4686 };
4687
4688 static VALUE
thread_shield_alloc(VALUE klass)4689 thread_shield_alloc(VALUE klass)
4690 {
4691 return TypedData_Wrap_Struct(klass, &thread_shield_data_type, (void *)mutex_alloc(0));
4692 }
4693
4694 #define GetThreadShieldPtr(obj) ((VALUE)rb_check_typeddata((obj), &thread_shield_data_type))
4695 #define THREAD_SHIELD_WAITING_MASK (FL_USER0|FL_USER1|FL_USER2|FL_USER3|FL_USER4|FL_USER5|FL_USER6|FL_USER7|FL_USER8|FL_USER9|FL_USER10|FL_USER11|FL_USER12|FL_USER13|FL_USER14|FL_USER15|FL_USER16|FL_USER17|FL_USER18|FL_USER19)
4696 #define THREAD_SHIELD_WAITING_SHIFT (FL_USHIFT)
4697 #define rb_thread_shield_waiting(b) (int)((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT)
4698
4699 static inline void
rb_thread_shield_waiting_inc(VALUE b)4700 rb_thread_shield_waiting_inc(VALUE b)
4701 {
4702 unsigned int w = rb_thread_shield_waiting(b);
4703 w++;
4704 if (w > (unsigned int)(THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT))
4705 rb_raise(rb_eRuntimeError, "waiting count overflow");
4706 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
4707 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
4708 }
4709
4710 static inline void
rb_thread_shield_waiting_dec(VALUE b)4711 rb_thread_shield_waiting_dec(VALUE b)
4712 {
4713 unsigned int w = rb_thread_shield_waiting(b);
4714 if (!w) rb_raise(rb_eRuntimeError, "waiting count underflow");
4715 w--;
4716 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
4717 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
4718 }
4719
4720 VALUE
rb_thread_shield_new(void)4721 rb_thread_shield_new(void)
4722 {
4723 VALUE thread_shield = thread_shield_alloc(rb_cThreadShield);
4724 rb_mutex_lock((VALUE)DATA_PTR(thread_shield));
4725 return thread_shield;
4726 }
4727
4728 /*
4729 * Wait a thread shield.
4730 *
4731 * Returns
4732 * true: acquired the thread shield
4733 * false: the thread shield was destroyed and no other threads waiting
4734 * nil: the thread shield was destroyed but still in use
4735 */
4736 VALUE
rb_thread_shield_wait(VALUE self)4737 rb_thread_shield_wait(VALUE self)
4738 {
4739 VALUE mutex = GetThreadShieldPtr(self);
4740 rb_mutex_t *m;
4741
4742 if (!mutex) return Qfalse;
4743 m = mutex_ptr(mutex);
4744 if (m->th == GET_THREAD()) return Qnil;
4745 rb_thread_shield_waiting_inc(self);
4746 rb_mutex_lock(mutex);
4747 rb_thread_shield_waiting_dec(self);
4748 if (DATA_PTR(self)) return Qtrue;
4749 rb_mutex_unlock(mutex);
4750 return rb_thread_shield_waiting(self) > 0 ? Qnil : Qfalse;
4751 }
4752
4753 static VALUE
thread_shield_get_mutex(VALUE self)4754 thread_shield_get_mutex(VALUE self)
4755 {
4756 VALUE mutex = GetThreadShieldPtr(self);
4757 if (!mutex)
4758 rb_raise(rb_eThreadError, "destroyed thread shield - %p", (void *)self);
4759 return mutex;
4760 }
4761
4762 /*
4763 * Release a thread shield, and return true if it has waiting threads.
4764 */
4765 VALUE
rb_thread_shield_release(VALUE self)4766 rb_thread_shield_release(VALUE self)
4767 {
4768 VALUE mutex = thread_shield_get_mutex(self);
4769 rb_mutex_unlock(mutex);
4770 return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse;
4771 }
4772
4773 /*
4774 * Release and destroy a thread shield, and return true if it has waiting threads.
4775 */
4776 VALUE
rb_thread_shield_destroy(VALUE self)4777 rb_thread_shield_destroy(VALUE self)
4778 {
4779 VALUE mutex = thread_shield_get_mutex(self);
4780 DATA_PTR(self) = 0;
4781 rb_mutex_unlock(mutex);
4782 return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse;
4783 }
4784
4785 static VALUE
threadptr_recursive_hash(rb_thread_t * th)4786 threadptr_recursive_hash(rb_thread_t *th)
4787 {
4788 return th->ec->local_storage_recursive_hash;
4789 }
4790
4791 static void
threadptr_recursive_hash_set(rb_thread_t * th,VALUE hash)4792 threadptr_recursive_hash_set(rb_thread_t *th, VALUE hash)
4793 {
4794 th->ec->local_storage_recursive_hash = hash;
4795 }
4796
4797 ID rb_frame_last_func(void);
4798
4799 /*
4800 * Returns the current "recursive list" used to detect recursion.
4801 * This list is a hash table, unique for the current thread and for
4802 * the current __callee__.
4803 */
4804
4805 static VALUE
recursive_list_access(VALUE sym)4806 recursive_list_access(VALUE sym)
4807 {
4808 rb_thread_t *th = GET_THREAD();
4809 VALUE hash = threadptr_recursive_hash(th);
4810 VALUE list;
4811 if (NIL_P(hash) || !RB_TYPE_P(hash, T_HASH)) {
4812 hash = rb_ident_hash_new();
4813 threadptr_recursive_hash_set(th, hash);
4814 list = Qnil;
4815 }
4816 else {
4817 list = rb_hash_aref(hash, sym);
4818 }
4819 if (NIL_P(list) || !RB_TYPE_P(list, T_HASH)) {
4820 list = rb_hash_new();
4821 rb_hash_aset(hash, sym, list);
4822 }
4823 return list;
4824 }
4825
4826 /*
4827 * Returns Qtrue iff obj_id (or the pair <obj, paired_obj>) is already
4828 * in the recursion list.
4829 * Assumes the recursion list is valid.
4830 */
4831
4832 static VALUE
recursive_check(VALUE list,VALUE obj_id,VALUE paired_obj_id)4833 recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id)
4834 {
4835 #if SIZEOF_LONG == SIZEOF_VOIDP
4836 #define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other))
4837 #elif SIZEOF_LONG_LONG == SIZEOF_VOIDP
4838 #define OBJ_ID_EQL(obj_id, other) (RB_TYPE_P((obj_id), T_BIGNUM) ? \
4839 rb_big_eql((obj_id), (other)) : ((obj_id) == (other)))
4840 #endif
4841
4842 VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef);
4843 if (pair_list == Qundef)
4844 return Qfalse;
4845 if (paired_obj_id) {
4846 if (!RB_TYPE_P(pair_list, T_HASH)) {
4847 if (!OBJ_ID_EQL(paired_obj_id, pair_list))
4848 return Qfalse;
4849 }
4850 else {
4851 if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id)))
4852 return Qfalse;
4853 }
4854 }
4855 return Qtrue;
4856 }
4857
4858 /*
4859 * Pushes obj_id (or the pair <obj_id, paired_obj_id>) in the recursion list.
4860 * For a single obj_id, it sets list[obj_id] to Qtrue.
4861 * For a pair, it sets list[obj_id] to paired_obj_id if possible,
4862 * otherwise list[obj_id] becomes a hash like:
4863 * {paired_obj_id_1 => true, paired_obj_id_2 => true, ... }
4864 * Assumes the recursion list is valid.
4865 */
4866
4867 static void
recursive_push(VALUE list,VALUE obj,VALUE paired_obj)4868 recursive_push(VALUE list, VALUE obj, VALUE paired_obj)
4869 {
4870 VALUE pair_list;
4871
4872 if (!paired_obj) {
4873 rb_hash_aset(list, obj, Qtrue);
4874 }
4875 else if ((pair_list = rb_hash_lookup2(list, obj, Qundef)) == Qundef) {
4876 rb_hash_aset(list, obj, paired_obj);
4877 }
4878 else {
4879 if (!RB_TYPE_P(pair_list, T_HASH)){
4880 VALUE other_paired_obj = pair_list;
4881 pair_list = rb_hash_new();
4882 rb_hash_aset(pair_list, other_paired_obj, Qtrue);
4883 rb_hash_aset(list, obj, pair_list);
4884 }
4885 rb_hash_aset(pair_list, paired_obj, Qtrue);
4886 }
4887 }
4888
4889 /*
4890 * Pops obj_id (or the pair <obj_id, paired_obj_id>) from the recursion list.
4891 * For a pair, if list[obj_id] is a hash, then paired_obj_id is
4892 * removed from the hash and no attempt is made to simplify
4893 * list[obj_id] from {only_one_paired_id => true} to only_one_paired_id
4894 * Assumes the recursion list is valid.
4895 */
4896
4897 static int
recursive_pop(VALUE list,VALUE obj,VALUE paired_obj)4898 recursive_pop(VALUE list, VALUE obj, VALUE paired_obj)
4899 {
4900 if (paired_obj) {
4901 VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
4902 if (pair_list == Qundef) {
4903 return 0;
4904 }
4905 if (RB_TYPE_P(pair_list, T_HASH)) {
4906 rb_hash_delete_entry(pair_list, paired_obj);
4907 if (!RHASH_EMPTY_P(pair_list)) {
4908 return 1; /* keep hash until is empty */
4909 }
4910 }
4911 }
4912 rb_hash_delete_entry(list, obj);
4913 return 1;
4914 }
4915
4916 struct exec_recursive_params {
4917 VALUE (*func) (VALUE, VALUE, int);
4918 VALUE list;
4919 VALUE obj;
4920 VALUE objid;
4921 VALUE pairid;
4922 VALUE arg;
4923 };
4924
4925 static VALUE
exec_recursive_i(RB_BLOCK_CALL_FUNC_ARGLIST (tag,data))4926 exec_recursive_i(RB_BLOCK_CALL_FUNC_ARGLIST(tag, data))
4927 {
4928 struct exec_recursive_params *p = (void *)data;
4929 return (*p->func)(p->obj, p->arg, FALSE);
4930 }
4931
4932 /*
4933 * Calls func(obj, arg, recursive), where recursive is non-zero if the
4934 * current method is called recursively on obj, or on the pair <obj, pairid>
4935 * If outer is 0, then the innermost func will be called with recursive set
4936 * to Qtrue, otherwise the outermost func will be called. In the latter case,
4937 * all inner func are short-circuited by throw.
4938 * Implementation details: the value thrown is the recursive list which is
4939 * proper to the current method and unlikely to be caught anywhere else.
4940 * list[recursive_key] is used as a flag for the outermost call.
4941 */
4942
4943 static VALUE
exec_recursive(VALUE (* func)(VALUE,VALUE,int),VALUE obj,VALUE pairid,VALUE arg,int outer)4944 exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer)
4945 {
4946 VALUE result = Qundef;
4947 const ID mid = rb_frame_last_func();
4948 const VALUE sym = mid ? ID2SYM(mid) : ID2SYM(idNULL);
4949 struct exec_recursive_params p;
4950 int outermost;
4951 p.list = recursive_list_access(sym);
4952 p.objid = rb_obj_id(obj);
4953 p.obj = obj;
4954 p.pairid = pairid;
4955 p.arg = arg;
4956 outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0);
4957
4958 if (recursive_check(p.list, p.objid, pairid)) {
4959 if (outer && !outermost) {
4960 rb_throw_obj(p.list, p.list);
4961 }
4962 return (*func)(obj, arg, TRUE);
4963 }
4964 else {
4965 enum ruby_tag_type state;
4966
4967 p.func = func;
4968
4969 if (outermost) {
4970 recursive_push(p.list, ID2SYM(recursive_key), 0);
4971 recursive_push(p.list, p.objid, p.pairid);
4972 result = rb_catch_protect(p.list, exec_recursive_i, (VALUE)&p, &state);
4973 if (!recursive_pop(p.list, p.objid, p.pairid)) goto invalid;
4974 if (!recursive_pop(p.list, ID2SYM(recursive_key), 0)) goto invalid;
4975 if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
4976 if (result == p.list) {
4977 result = (*func)(obj, arg, TRUE);
4978 }
4979 }
4980 else {
4981 volatile VALUE ret = Qundef;
4982 recursive_push(p.list, p.objid, p.pairid);
4983 EC_PUSH_TAG(GET_EC());
4984 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
4985 ret = (*func)(obj, arg, FALSE);
4986 }
4987 EC_POP_TAG();
4988 if (!recursive_pop(p.list, p.objid, p.pairid)) {
4989 invalid:
4990 rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list "
4991 "for %+"PRIsVALUE" in %+"PRIsVALUE,
4992 sym, rb_thread_current());
4993 }
4994 if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
4995 result = ret;
4996 }
4997 }
4998 *(volatile struct exec_recursive_params *)&p;
4999 return result;
5000 }
5001
5002 /*
5003 * Calls func(obj, arg, recursive), where recursive is non-zero if the
5004 * current method is called recursively on obj
5005 */
5006
5007 VALUE
rb_exec_recursive(VALUE (* func)(VALUE,VALUE,int),VALUE obj,VALUE arg)5008 rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
5009 {
5010 return exec_recursive(func, obj, 0, arg, 0);
5011 }
5012
5013 /*
5014 * Calls func(obj, arg, recursive), where recursive is non-zero if the
5015 * current method is called recursively on the ordered pair <obj, paired_obj>
5016 */
5017
5018 VALUE
rb_exec_recursive_paired(VALUE (* func)(VALUE,VALUE,int),VALUE obj,VALUE paired_obj,VALUE arg)5019 rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
5020 {
5021 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 0);
5022 }
5023
5024 /*
5025 * If recursion is detected on the current method and obj, the outermost
5026 * func will be called with (obj, arg, Qtrue). All inner func will be
5027 * short-circuited using throw.
5028 */
5029
5030 VALUE
rb_exec_recursive_outer(VALUE (* func)(VALUE,VALUE,int),VALUE obj,VALUE arg)5031 rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
5032 {
5033 return exec_recursive(func, obj, 0, arg, 1);
5034 }
5035
5036 /*
5037 * If recursion is detected on the current method, obj and paired_obj,
5038 * the outermost func will be called with (obj, arg, Qtrue). All inner
5039 * func will be short-circuited using throw.
5040 */
5041
5042 VALUE
rb_exec_recursive_paired_outer(VALUE (* func)(VALUE,VALUE,int),VALUE obj,VALUE paired_obj,VALUE arg)5043 rb_exec_recursive_paired_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
5044 {
5045 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 1);
5046 }
5047
5048 /*
5049 * call-seq:
5050 * thread.backtrace -> array
5051 *
5052 * Returns the current backtrace of the target thread.
5053 *
5054 */
5055
5056 static VALUE
rb_thread_backtrace_m(int argc,VALUE * argv,VALUE thval)5057 rb_thread_backtrace_m(int argc, VALUE *argv, VALUE thval)
5058 {
5059 return rb_vm_thread_backtrace(argc, argv, thval);
5060 }
5061
5062 /* call-seq:
5063 * thread.backtrace_locations(*args) -> array or nil
5064 *
5065 * Returns the execution stack for the target thread---an array containing
5066 * backtrace location objects.
5067 *
5068 * See Thread::Backtrace::Location for more information.
5069 *
5070 * This method behaves similarly to Kernel#caller_locations except it applies
5071 * to a specific thread.
5072 */
5073 static VALUE
rb_thread_backtrace_locations_m(int argc,VALUE * argv,VALUE thval)5074 rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval)
5075 {
5076 return rb_vm_thread_backtrace_locations(argc, argv, thval);
5077 }
5078
5079 /*
5080 * Document-class: ThreadError
5081 *
5082 * Raised when an invalid operation is attempted on a thread.
5083 *
5084 * For example, when no other thread has been started:
5085 *
5086 * Thread.stop
5087 *
5088 * This will raises the following exception:
5089 *
5090 * ThreadError: stopping only thread
5091 * note: use sleep to stop forever
5092 */
5093
5094 void
Init_Thread(void)5095 Init_Thread(void)
5096 {
5097 #undef rb_intern
5098 #define rb_intern(str) rb_intern_const(str)
5099
5100 VALUE cThGroup;
5101 rb_thread_t *th = GET_THREAD();
5102
5103 sym_never = ID2SYM(rb_intern("never"));
5104 sym_immediate = ID2SYM(rb_intern("immediate"));
5105 sym_on_blocking = ID2SYM(rb_intern("on_blocking"));
5106 id_locals = rb_intern("locals");
5107
5108 rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
5109 rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
5110 rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
5111 rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0);
5112 rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0);
5113 rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0);
5114 rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1);
5115 rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0);
5116 rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0);
5117 rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0);
5118 rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0);
5119 rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1);
5120 rb_define_singleton_method(rb_cThread, "report_on_exception", rb_thread_s_report_exc, 0);
5121 rb_define_singleton_method(rb_cThread, "report_on_exception=", rb_thread_s_report_exc_set, 1);
5122 #if THREAD_DEBUG < 0
5123 rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
5124 rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
5125 #endif
5126 rb_define_singleton_method(rb_cThread, "handle_interrupt", rb_thread_s_handle_interrupt, 1);
5127 rb_define_singleton_method(rb_cThread, "pending_interrupt?", rb_thread_s_pending_interrupt_p, -1);
5128 rb_define_method(rb_cThread, "pending_interrupt?", rb_thread_pending_interrupt_p, -1);
5129
5130 rb_define_method(rb_cThread, "initialize", thread_initialize, -2);
5131 rb_define_method(rb_cThread, "raise", thread_raise_m, -1);
5132 rb_define_method(rb_cThread, "join", thread_join_m, -1);
5133 rb_define_method(rb_cThread, "value", thread_value, 0);
5134 rb_define_method(rb_cThread, "kill", rb_thread_kill, 0);
5135 rb_define_method(rb_cThread, "terminate", rb_thread_kill, 0);
5136 rb_define_method(rb_cThread, "exit", rb_thread_kill, 0);
5137 rb_define_method(rb_cThread, "run", rb_thread_run, 0);
5138 rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0);
5139 rb_define_method(rb_cThread, "[]", rb_thread_aref, 1);
5140 rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2);
5141 rb_define_method(rb_cThread, "fetch", rb_thread_fetch, -1);
5142 rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1);
5143 rb_define_method(rb_cThread, "keys", rb_thread_keys, 0);
5144 rb_define_method(rb_cThread, "priority", rb_thread_priority, 0);
5145 rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1);
5146 rb_define_method(rb_cThread, "status", rb_thread_status, 0);
5147 rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1);
5148 rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2);
5149 rb_define_method(rb_cThread, "thread_variables", rb_thread_variables, 0);
5150 rb_define_method(rb_cThread, "thread_variable?", rb_thread_variable_p, 1);
5151 rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0);
5152 rb_define_method(rb_cThread, "stop?", rb_thread_stop_p, 0);
5153 rb_define_method(rb_cThread, "abort_on_exception", rb_thread_abort_exc, 0);
5154 rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1);
5155 rb_define_method(rb_cThread, "report_on_exception", rb_thread_report_exc, 0);
5156 rb_define_method(rb_cThread, "report_on_exception=", rb_thread_report_exc_set, 1);
5157 rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0);
5158 rb_define_method(rb_cThread, "group", rb_thread_group, 0);
5159 rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1);
5160 rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1);
5161
5162 rb_define_method(rb_cThread, "name", rb_thread_getname, 0);
5163 rb_define_method(rb_cThread, "name=", rb_thread_setname, 1);
5164 rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0);
5165 rb_define_alias(rb_cThread, "inspect", "to_s");
5166
5167 rb_vm_register_special_exception(ruby_error_stream_closed, rb_eIOError,
5168 "stream closed in another thread");
5169
5170 cThGroup = rb_define_class("ThreadGroup", rb_cObject);
5171 rb_define_alloc_func(cThGroup, thgroup_s_alloc);
5172 rb_define_method(cThGroup, "list", thgroup_list, 0);
5173 rb_define_method(cThGroup, "enclose", thgroup_enclose, 0);
5174 rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0);
5175 rb_define_method(cThGroup, "add", thgroup_add, 1);
5176
5177 {
5178 th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup);
5179 rb_define_const(cThGroup, "Default", th->thgroup);
5180 }
5181
5182 recursive_key = rb_intern("__recursive_key__");
5183 rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);
5184
5185 /* init thread core */
5186 {
5187 /* main thread setting */
5188 {
5189 /* acquire global vm lock */
5190 gvl_init(th->vm);
5191 gvl_acquire(th->vm, th);
5192 rb_native_mutex_initialize(&th->vm->waitpid_lock);
5193 rb_native_mutex_initialize(&th->vm->workqueue_lock);
5194 rb_native_mutex_initialize(&th->interrupt_lock);
5195
5196 th->pending_interrupt_queue = rb_ary_tmp_new(0);
5197 th->pending_interrupt_queue_checked = 0;
5198 th->pending_interrupt_mask_stack = rb_ary_tmp_new(0);
5199 }
5200 }
5201
5202 rb_thread_create_timer_thread();
5203
5204 /* suppress warnings on cygwin, mingw and mswin.*/
5205 (void)native_mutex_trylock;
5206
5207 Init_thread_sync();
5208 }
5209
5210 int
ruby_native_thread_p(void)5211 ruby_native_thread_p(void)
5212 {
5213 rb_thread_t *th = ruby_thread_from_native();
5214
5215 return th != 0;
5216 }
5217
5218 static void
debug_deadlock_check(rb_vm_t * vm,VALUE msg)5219 debug_deadlock_check(rb_vm_t *vm, VALUE msg)
5220 {
5221 rb_thread_t *th = 0;
5222 VALUE sep = rb_str_new_cstr("\n ");
5223
5224 rb_str_catf(msg, "\n%d threads, %d sleeps current:%p main thread:%p\n",
5225 vm_living_thread_num(vm), vm->sleeper, (void *)GET_THREAD(), (void *)vm->main_thread);
5226 list_for_each(&vm->living_threads, th, vmlt_node) {
5227 rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p "
5228 "native:%"PRI_THREAD_ID" int:%u",
5229 th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag);
5230 if (th->locking_mutex) {
5231 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
5232 rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
5233 (void *)mutex->th, rb_mutex_num_waiting(mutex));
5234 }
5235 {
5236 rb_thread_list_t *list = th->join_list;
5237 while (list) {
5238 rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->th);
5239 list = list->next;
5240 }
5241 }
5242 rb_str_catf(msg, "\n ");
5243 rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, 0, 0), sep));
5244 rb_str_catf(msg, "\n");
5245 }
5246 }
5247
5248 static void
rb_check_deadlock(rb_vm_t * vm)5249 rb_check_deadlock(rb_vm_t *vm)
5250 {
5251 int found = 0;
5252 rb_thread_t *th = 0;
5253
5254 if (vm_living_thread_num(vm) > vm->sleeper) return;
5255 if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
5256 if (patrol_thread && patrol_thread != GET_THREAD()) return;
5257
5258 list_for_each(&vm->living_threads, th, vmlt_node) {
5259 if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) {
5260 found = 1;
5261 }
5262 else if (th->locking_mutex) {
5263 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
5264
5265 if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) {
5266 found = 1;
5267 }
5268 }
5269 if (found)
5270 break;
5271 }
5272
5273 if (!found) {
5274 VALUE argv[2];
5275 argv[0] = rb_eFatal;
5276 argv[1] = rb_str_new2("No live threads left. Deadlock?");
5277 debug_deadlock_check(vm, argv[1]);
5278 vm->sleeper--;
5279 rb_threadptr_raise(vm->main_thread, 2, argv);
5280 }
5281 }
5282
5283 static void
update_line_coverage(VALUE data,const rb_trace_arg_t * trace_arg)5284 update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
5285 {
5286 const rb_control_frame_t *cfp = GET_EC()->cfp;
5287 VALUE coverage = rb_iseq_coverage(cfp->iseq);
5288 if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
5289 VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
5290 if (lines) {
5291 long line = rb_sourceline() - 1;
5292 long count;
5293 VALUE num;
5294 void rb_iseq_clear_event_flags(const rb_iseq_t *iseq, size_t pos, rb_event_flag_t reset);
5295 if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) {
5296 rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - cfp->iseq->body->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE);
5297 rb_ary_push(lines, LONG2FIX(line + 1));
5298 return;
5299 }
5300 if (line >= RARRAY_LEN(lines)) { /* no longer tracked */
5301 return;
5302 }
5303 num = RARRAY_AREF(lines, line);
5304 if (!FIXNUM_P(num)) return;
5305 count = FIX2LONG(num) + 1;
5306 if (POSFIXABLE(count)) {
5307 RARRAY_ASET(lines, line, LONG2FIX(count));
5308 }
5309 }
5310 }
5311 }
5312
5313 static void
update_branch_coverage(VALUE data,const rb_trace_arg_t * trace_arg)5314 update_branch_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
5315 {
5316 const rb_control_frame_t *cfp = GET_EC()->cfp;
5317 VALUE coverage = rb_iseq_coverage(cfp->iseq);
5318 if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
5319 VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES);
5320 if (branches) {
5321 long pc = cfp->pc - cfp->iseq->body->iseq_encoded - 1;
5322 long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(cfp->iseq), pc)), count;
5323 VALUE counters = RARRAY_AREF(branches, 1);
5324 VALUE num = RARRAY_AREF(counters, idx);
5325 count = FIX2LONG(num) + 1;
5326 if (POSFIXABLE(count)) {
5327 RARRAY_ASET(counters, idx, LONG2FIX(count));
5328 }
5329 }
5330 }
5331 }
5332
5333 const rb_method_entry_t *
rb_resolve_me_location(const rb_method_entry_t * me,VALUE resolved_location[5])5334 rb_resolve_me_location(const rb_method_entry_t *me, VALUE resolved_location[5])
5335 {
5336 VALUE path, beg_pos_lineno, beg_pos_column, end_pos_lineno, end_pos_column;
5337
5338 retry:
5339 switch (me->def->type) {
5340 case VM_METHOD_TYPE_ISEQ: {
5341 const rb_iseq_t *iseq = me->def->body.iseq.iseqptr;
5342 rb_iseq_location_t *loc = &iseq->body->location;
5343 path = rb_iseq_path(iseq);
5344 beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
5345 beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
5346 end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
5347 end_pos_column = INT2FIX(loc->code_location.end_pos.column);
5348 break;
5349 }
5350 case VM_METHOD_TYPE_BMETHOD: {
5351 const rb_iseq_t *iseq = rb_proc_get_iseq(me->def->body.bmethod.proc, 0);
5352 if (iseq) {
5353 rb_iseq_location_t *loc;
5354 rb_iseq_check(iseq);
5355 path = rb_iseq_path(iseq);
5356 loc = &iseq->body->location;
5357 beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
5358 beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
5359 end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
5360 end_pos_column = INT2FIX(loc->code_location.end_pos.column);
5361 break;
5362 }
5363 return NULL;
5364 }
5365 case VM_METHOD_TYPE_ALIAS:
5366 me = me->def->body.alias.original_me;
5367 goto retry;
5368 case VM_METHOD_TYPE_REFINED:
5369 me = me->def->body.refined.orig_me;
5370 if (!me) return NULL;
5371 goto retry;
5372 default:
5373 return NULL;
5374 }
5375
5376 /* found */
5377 if (RB_TYPE_P(path, T_ARRAY)) {
5378 path = rb_ary_entry(path, 1);
5379 if (!RB_TYPE_P(path, T_STRING)) return NULL; /* just for the case... */
5380 }
5381 if (resolved_location) {
5382 resolved_location[0] = path;
5383 resolved_location[1] = beg_pos_lineno;
5384 resolved_location[2] = beg_pos_column;
5385 resolved_location[3] = end_pos_lineno;
5386 resolved_location[4] = end_pos_column;
5387 }
5388 return me;
5389 }
5390
5391 static void
update_method_coverage(VALUE me2counter,rb_trace_arg_t * trace_arg)5392 update_method_coverage(VALUE me2counter, rb_trace_arg_t *trace_arg)
5393 {
5394 const rb_control_frame_t *cfp = GET_EC()->cfp;
5395 const rb_callable_method_entry_t *cme = rb_vm_frame_method_entry(cfp);
5396 const rb_method_entry_t *me = (const rb_method_entry_t *)cme;
5397 VALUE rcount;
5398 long count;
5399
5400 me = rb_resolve_me_location(me, 0);
5401 if (!me) return;
5402
5403 rcount = rb_hash_aref(me2counter, (VALUE) me);
5404 count = FIXNUM_P(rcount) ? FIX2LONG(rcount) + 1 : 1;
5405 if (POSFIXABLE(count)) {
5406 rb_hash_aset(me2counter, (VALUE) me, LONG2FIX(count));
5407 }
5408 }
5409
5410 VALUE
rb_get_coverages(void)5411 rb_get_coverages(void)
5412 {
5413 return GET_VM()->coverages;
5414 }
5415
5416 int
rb_get_coverage_mode(void)5417 rb_get_coverage_mode(void)
5418 {
5419 return GET_VM()->coverage_mode;
5420 }
5421
5422 void
rb_set_coverages(VALUE coverages,int mode,VALUE me2counter)5423 rb_set_coverages(VALUE coverages, int mode, VALUE me2counter)
5424 {
5425 GET_VM()->coverages = coverages;
5426 GET_VM()->coverage_mode = mode;
5427 rb_add_event_hook2((rb_event_hook_func_t) update_line_coverage, RUBY_EVENT_COVERAGE_LINE, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
5428 if (mode & COVERAGE_TARGET_BRANCHES) {
5429 rb_add_event_hook2((rb_event_hook_func_t) update_branch_coverage, RUBY_EVENT_COVERAGE_BRANCH, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
5430 }
5431 if (mode & COVERAGE_TARGET_METHODS) {
5432 rb_add_event_hook2((rb_event_hook_func_t) update_method_coverage, RUBY_EVENT_CALL, me2counter, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
5433 }
5434 }
5435
5436 /* Make coverage arrays empty so old covered files are no longer tracked. */
5437 void
rb_reset_coverages(void)5438 rb_reset_coverages(void)
5439 {
5440 rb_clear_coverages();
5441 rb_iseq_remove_coverage_all();
5442 GET_VM()->coverages = Qfalse;
5443 rb_remove_event_hook((rb_event_hook_func_t) update_line_coverage);
5444 if (GET_VM()->coverage_mode & COVERAGE_TARGET_BRANCHES) {
5445 rb_remove_event_hook((rb_event_hook_func_t) update_branch_coverage);
5446 }
5447 if (GET_VM()->coverage_mode & COVERAGE_TARGET_METHODS) {
5448 rb_remove_event_hook((rb_event_hook_func_t) update_method_coverage);
5449 }
5450 }
5451
5452 VALUE
rb_default_coverage(int n)5453 rb_default_coverage(int n)
5454 {
5455 VALUE coverage = rb_ary_tmp_new_fill(3);
5456 VALUE lines = Qfalse, branches = Qfalse;
5457 int mode = GET_VM()->coverage_mode;
5458
5459 if (mode & COVERAGE_TARGET_LINES) {
5460 lines = n > 0 ? rb_ary_tmp_new_fill(n) : rb_ary_tmp_new(0);
5461 }
5462 RARRAY_ASET(coverage, COVERAGE_INDEX_LINES, lines);
5463
5464 if (mode & COVERAGE_TARGET_BRANCHES) {
5465 branches = rb_ary_tmp_new_fill(2);
5466 /* internal data structures for branch coverage:
5467 *
5468 * [[base_type, base_first_lineno, base_first_column, base_last_lineno, base_last_column,
5469 * target_type_1, target_first_lineno_1, target_first_column_1, target_last_lineno_1, target_last_column_1, target_counter_index_1,
5470 * target_type_2, target_first_lineno_2, target_first_column_2, target_last_lineno_2, target_last_column_2, target_counter_index_2, ...],
5471 * ...]
5472 *
5473 * Example: [[:case, 1, 0, 4, 3,
5474 * :when, 2, 8, 2, 9, 0,
5475 * :when, 3, 8, 3, 9, 1, ...],
5476 * ...]
5477 */
5478 RARRAY_ASET(branches, 0, rb_ary_tmp_new(0));
5479 /* branch execution counters */
5480 RARRAY_ASET(branches, 1, rb_ary_tmp_new(0));
5481 }
5482 RARRAY_ASET(coverage, COVERAGE_INDEX_BRANCHES, branches);
5483
5484 return coverage;
5485 }
5486
5487 VALUE
rb_uninterruptible(VALUE (* b_proc)(ANYARGS),VALUE data)5488 rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data)
5489 {
5490 VALUE interrupt_mask = rb_ident_hash_new();
5491 rb_thread_t *cur_th = GET_THREAD();
5492
5493 rb_hash_aset(interrupt_mask, rb_cObject, sym_never);
5494 OBJ_FREEZE_RAW(interrupt_mask);
5495 rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask);
5496
5497 return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack);
5498 }
5499