1 /* included by thread.c */
2 #include "ccan/list/list.h"
3 
4 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
5 static VALUE rb_eClosedQueueError;
6 
7 /* sync_waiter is always on-stack */
8 struct sync_waiter {
9     rb_thread_t *th;
10     struct list_node node;
11 };
12 
13 #define MUTEX_ALLOW_TRAP FL_USER1
14 
15 static void
sync_wakeup(struct list_head * head,long max)16 sync_wakeup(struct list_head *head, long max)
17 {
18     struct sync_waiter *cur = 0, *next;
19 
20     list_for_each_safe(head, cur, next, node) {
21 	list_del_init(&cur->node);
22 	if (cur->th->status != THREAD_KILLED) {
23 	    rb_threadptr_interrupt(cur->th);
24 	    cur->th->status = THREAD_RUNNABLE;
25 	    if (--max == 0) return;
26 	}
27     }
28 }
29 
30 static void
wakeup_one(struct list_head * head)31 wakeup_one(struct list_head *head)
32 {
33     sync_wakeup(head, 1);
34 }
35 
36 static void
wakeup_all(struct list_head * head)37 wakeup_all(struct list_head *head)
38 {
39     sync_wakeup(head, LONG_MAX);
40 }
41 
42 /* Mutex */
43 
44 typedef struct rb_mutex_struct {
45     rb_thread_t *th;
46     struct rb_mutex_struct *next_mutex;
47     struct list_head waitq; /* protected by GVL */
48 } rb_mutex_t;
49 
50 #if defined(HAVE_WORKING_FORK)
51 static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
52 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
53 static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
54 #endif
55 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th);
56 
57 /*
58  *  Document-class: Mutex
59  *
60  *  Mutex implements a simple semaphore that can be used to coordinate access to
61  *  shared data from multiple concurrent threads.
62  *
63  *  Example:
64  *
65  *    semaphore = Mutex.new
66  *
67  *    a = Thread.new {
68  *      semaphore.synchronize {
69  *        # access shared resource
70  *      }
71  *    }
72  *
73  *    b = Thread.new {
74  *      semaphore.synchronize {
75  *        # access shared resource
76  *      }
77  *    }
78  *
79  */
80 
81 #define mutex_mark NULL
82 
83 static size_t
rb_mutex_num_waiting(rb_mutex_t * mutex)84 rb_mutex_num_waiting(rb_mutex_t *mutex)
85 {
86     struct sync_waiter *w = 0;
87     size_t n = 0;
88 
89     list_for_each(&mutex->waitq, w, node) {
90 	n++;
91     }
92 
93     return n;
94 }
95 
96 static void
mutex_free(void * ptr)97 mutex_free(void *ptr)
98 {
99     rb_mutex_t *mutex = ptr;
100     if (mutex->th) {
101 	/* rb_warn("free locked mutex"); */
102 	const char *err = rb_mutex_unlock_th(mutex, mutex->th);
103 	if (err) rb_bug("%s", err);
104     }
105     ruby_xfree(ptr);
106 }
107 
108 static size_t
mutex_memsize(const void * ptr)109 mutex_memsize(const void *ptr)
110 {
111     return sizeof(rb_mutex_t);
112 }
113 
114 static const rb_data_type_t mutex_data_type = {
115     "mutex",
116     {mutex_mark, mutex_free, mutex_memsize,},
117     0, 0, RUBY_TYPED_FREE_IMMEDIATELY
118 };
119 
120 static rb_mutex_t *
mutex_ptr(VALUE obj)121 mutex_ptr(VALUE obj)
122 {
123     rb_mutex_t *mutex;
124 
125     TypedData_Get_Struct(obj, rb_mutex_t, &mutex_data_type, mutex);
126 
127     return mutex;
128 }
129 
130 VALUE
rb_obj_is_mutex(VALUE obj)131 rb_obj_is_mutex(VALUE obj)
132 {
133     if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) {
134 	return Qtrue;
135     }
136     else {
137 	return Qfalse;
138     }
139 }
140 
141 static VALUE
mutex_alloc(VALUE klass)142 mutex_alloc(VALUE klass)
143 {
144     VALUE obj;
145     rb_mutex_t *mutex;
146 
147     obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
148     list_head_init(&mutex->waitq);
149     return obj;
150 }
151 
152 /*
153  *  call-seq:
154  *     Mutex.new   -> mutex
155  *
156  *  Creates a new Mutex
157  */
158 static VALUE
mutex_initialize(VALUE self)159 mutex_initialize(VALUE self)
160 {
161     return self;
162 }
163 
164 VALUE
rb_mutex_new(void)165 rb_mutex_new(void)
166 {
167     return mutex_alloc(rb_cMutex);
168 }
169 
170 /*
171  * call-seq:
172  *    mutex.locked?  -> true or false
173  *
174  * Returns +true+ if this lock is currently held by some thread.
175  */
176 VALUE
rb_mutex_locked_p(VALUE self)177 rb_mutex_locked_p(VALUE self)
178 {
179     rb_mutex_t *mutex = mutex_ptr(self);
180 
181     return mutex->th ? Qtrue : Qfalse;
182 }
183 
184 static void
mutex_locked(rb_thread_t * th,VALUE self)185 mutex_locked(rb_thread_t *th, VALUE self)
186 {
187     rb_mutex_t *mutex = mutex_ptr(self);
188 
189     if (th->keeping_mutexes) {
190 	mutex->next_mutex = th->keeping_mutexes;
191     }
192     th->keeping_mutexes = mutex;
193 }
194 
195 /*
196  * call-seq:
197  *    mutex.try_lock  -> true or false
198  *
199  * Attempts to obtain the lock and returns immediately. Returns +true+ if the
200  * lock was granted.
201  */
202 VALUE
rb_mutex_trylock(VALUE self)203 rb_mutex_trylock(VALUE self)
204 {
205     rb_mutex_t *mutex = mutex_ptr(self);
206     VALUE locked = Qfalse;
207 
208     if (mutex->th == 0) {
209 	rb_thread_t *th = GET_THREAD();
210 	mutex->th = th;
211 	locked = Qtrue;
212 
213 	mutex_locked(th, self);
214     }
215 
216     return locked;
217 }
218 
219 /*
220  * At maximum, only one thread can use cond_timedwait and watch deadlock
221  * periodically. Multiple polling thread (i.e. concurrent deadlock check)
222  * introduces new race conditions. [Bug #6278] [ruby-core:44275]
223  */
224 static const rb_thread_t *patrol_thread = NULL;
225 
226 static VALUE
do_mutex_lock(VALUE self,int interruptible_p)227 do_mutex_lock(VALUE self, int interruptible_p)
228 {
229     rb_thread_t *th = GET_THREAD();
230     rb_mutex_t *mutex = mutex_ptr(self);
231 
232     /* When running trap handler */
233     if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
234 	th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
235 	rb_raise(rb_eThreadError, "can't be called from trap context");
236     }
237 
238     if (rb_mutex_trylock(self) == Qfalse) {
239 	struct sync_waiter w;
240 
241 	if (mutex->th == th) {
242 	    rb_raise(rb_eThreadError, "deadlock; recursive locking");
243 	}
244 
245 	w.th = th;
246 
247 	while (mutex->th != th) {
248 	    enum rb_thread_status prev_status = th->status;
249 	    rb_hrtime_t *timeout = 0;
250 	    rb_hrtime_t rel = rb_msec2hrtime(100);
251 
252 	    th->status = THREAD_STOPPED_FOREVER;
253 	    th->locking_mutex = self;
254 	    th->vm->sleeper++;
255 	    /*
256 	     * Carefully! while some contended threads are in native_sleep(),
257 	     * vm->sleeper is unstable value. we have to avoid both deadlock
258 	     * and busy loop.
259 	     */
260 	    if ((vm_living_thread_num(th->vm) == th->vm->sleeper) &&
261 		!patrol_thread) {
262 		timeout = &rel;
263 		patrol_thread = th;
264 	    }
265 
266 	    list_add_tail(&mutex->waitq, &w.node);
267 	    native_sleep(th, timeout); /* release GVL */
268 	    list_del(&w.node);
269 
270 	    if (!mutex->th) {
271 		mutex->th = th;
272 	    }
273 
274 	    if (patrol_thread == th)
275 		patrol_thread = NULL;
276 
277 	    th->locking_mutex = Qfalse;
278 	    if (mutex->th && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
279 		rb_check_deadlock(th->vm);
280 	    }
281 	    if (th->status == THREAD_STOPPED_FOREVER) {
282 		th->status = prev_status;
283 	    }
284 	    th->vm->sleeper--;
285 
286             if (interruptible_p) {
287                 /* release mutex before checking for interrupts...as interrupt checking
288                  * code might call rb_raise() */
289                 if (mutex->th == th) mutex->th = 0;
290                 RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
291                 if (!mutex->th) {
292                     mutex->th = th;
293                     mutex_locked(th, self);
294                 }
295             } else {
296                 if (mutex->th == th) mutex_locked(th, self);
297             }
298 	}
299     }
300     return self;
301 }
302 
303 static VALUE
mutex_lock_uninterruptible(VALUE self)304 mutex_lock_uninterruptible(VALUE self)
305 {
306     return do_mutex_lock(self, 0);
307 }
308 
309 /*
310  * call-seq:
311  *    mutex.lock  -> self
312  *
313  * Attempts to grab the lock and waits if it isn't available.
314  * Raises +ThreadError+ if +mutex+ was locked by the current thread.
315  */
316 VALUE
rb_mutex_lock(VALUE self)317 rb_mutex_lock(VALUE self)
318 {
319     return do_mutex_lock(self, 1);
320 }
321 
322 /*
323  * call-seq:
324  *    mutex.owned?  -> true or false
325  *
326  * Returns +true+ if this lock is currently held by current thread.
327  */
328 VALUE
rb_mutex_owned_p(VALUE self)329 rb_mutex_owned_p(VALUE self)
330 {
331     VALUE owned = Qfalse;
332     rb_thread_t *th = GET_THREAD();
333     rb_mutex_t *mutex = mutex_ptr(self);
334 
335     if (mutex->th == th)
336 	owned = Qtrue;
337 
338     return owned;
339 }
340 
341 static const char *
rb_mutex_unlock_th(rb_mutex_t * mutex,rb_thread_t * th)342 rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th)
343 {
344     const char *err = NULL;
345 
346     if (mutex->th == 0) {
347 	err = "Attempt to unlock a mutex which is not locked";
348     }
349     else if (mutex->th != th) {
350 	err = "Attempt to unlock a mutex which is locked by another thread";
351     }
352     else {
353 	struct sync_waiter *cur = 0, *next;
354 	rb_mutex_t **th_mutex = &th->keeping_mutexes;
355 
356 	mutex->th = 0;
357 	list_for_each_safe(&mutex->waitq, cur, next, node) {
358 	    list_del_init(&cur->node);
359 	    switch (cur->th->status) {
360 	      case THREAD_RUNNABLE: /* from someone else calling Thread#run */
361 	      case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
362 		rb_threadptr_interrupt(cur->th);
363 		goto found;
364 	      case THREAD_STOPPED: /* probably impossible */
365 		rb_bug("unexpected THREAD_STOPPED");
366 	      case THREAD_KILLED:
367                 /* not sure about this, possible in exit GC? */
368 		rb_bug("unexpected THREAD_KILLED");
369 		continue;
370 	    }
371 	}
372       found:
373 	while (*th_mutex != mutex) {
374 	    th_mutex = &(*th_mutex)->next_mutex;
375 	}
376 	*th_mutex = mutex->next_mutex;
377 	mutex->next_mutex = NULL;
378     }
379 
380     return err;
381 }
382 
383 /*
384  * call-seq:
385  *    mutex.unlock    -> self
386  *
387  * Releases the lock.
388  * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
389  */
390 VALUE
rb_mutex_unlock(VALUE self)391 rb_mutex_unlock(VALUE self)
392 {
393     const char *err;
394     rb_mutex_t *mutex = mutex_ptr(self);
395 
396     err = rb_mutex_unlock_th(mutex, GET_THREAD());
397     if (err) rb_raise(rb_eThreadError, "%s", err);
398 
399     return self;
400 }
401 
402 #if defined(HAVE_WORKING_FORK)
403 static void
rb_mutex_abandon_keeping_mutexes(rb_thread_t * th)404 rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
405 {
406     rb_mutex_abandon_all(th->keeping_mutexes);
407     th->keeping_mutexes = NULL;
408 }
409 
410 static void
rb_mutex_abandon_locking_mutex(rb_thread_t * th)411 rb_mutex_abandon_locking_mutex(rb_thread_t *th)
412 {
413     if (th->locking_mutex) {
414         rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
415 
416         list_head_init(&mutex->waitq);
417         th->locking_mutex = Qfalse;
418     }
419 }
420 
421 static void
rb_mutex_abandon_all(rb_mutex_t * mutexes)422 rb_mutex_abandon_all(rb_mutex_t *mutexes)
423 {
424     rb_mutex_t *mutex;
425 
426     while (mutexes) {
427 	mutex = mutexes;
428 	mutexes = mutex->next_mutex;
429 	mutex->th = 0;
430 	mutex->next_mutex = 0;
431 	list_head_init(&mutex->waitq);
432     }
433 }
434 #endif
435 
436 static VALUE
rb_mutex_sleep_forever(VALUE time)437 rb_mutex_sleep_forever(VALUE time)
438 {
439     rb_thread_sleep_deadly_allow_spurious_wakeup();
440     return Qnil;
441 }
442 
443 static VALUE
rb_mutex_wait_for(VALUE time)444 rb_mutex_wait_for(VALUE time)
445 {
446     rb_hrtime_t *rel = (rb_hrtime_t *)time;
447     /* permit spurious check */
448     sleep_hrtime(GET_THREAD(), *rel, 0);
449     return Qnil;
450 }
451 
452 VALUE
rb_mutex_sleep(VALUE self,VALUE timeout)453 rb_mutex_sleep(VALUE self, VALUE timeout)
454 {
455     time_t beg, end;
456     struct timeval t;
457 
458     if (!NIL_P(timeout)) {
459         t = rb_time_interval(timeout);
460     }
461 
462     rb_mutex_unlock(self);
463     beg = time(0);
464     if (NIL_P(timeout)) {
465 	rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self);
466     }
467     else {
468         rb_hrtime_t rel = rb_timeval2hrtime(&t);
469 
470         rb_ensure(rb_mutex_wait_for, (VALUE)&rel,
471                   mutex_lock_uninterruptible, self);
472     }
473     RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
474     end = time(0) - beg;
475     return INT2FIX(end);
476 }
477 
478 /*
479  * call-seq:
480  *    mutex.sleep(timeout = nil)    -> number
481  *
482  * Releases the lock and sleeps +timeout+ seconds if it is given and
483  * non-nil or forever.  Raises +ThreadError+ if +mutex+ wasn't locked by
484  * the current thread.
485  *
486  * When the thread is next woken up, it will attempt to reacquire
487  * the lock.
488  *
489  * Note that this method can wakeup without explicit Thread#wakeup call.
490  * For example, receiving signal and so on.
491  */
492 static VALUE
mutex_sleep(int argc,VALUE * argv,VALUE self)493 mutex_sleep(int argc, VALUE *argv, VALUE self)
494 {
495     VALUE timeout;
496 
497     timeout = rb_check_arity(argc, 0, 1) ? argv[0] : Qnil;
498     return rb_mutex_sleep(self, timeout);
499 }
500 
501 /*
502  * call-seq:
503  *    mutex.synchronize { ... }    -> result of the block
504  *
505  * Obtains a lock, runs the block, and releases the lock when the block
506  * completes.  See the example under +Mutex+.
507  */
508 
509 VALUE
rb_mutex_synchronize(VALUE mutex,VALUE (* func)(VALUE arg),VALUE arg)510 rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
511 {
512     rb_mutex_lock(mutex);
513     return rb_ensure(func, arg, rb_mutex_unlock, mutex);
514 }
515 
516 /*
517  * call-seq:
518  *    mutex.synchronize { ... }    -> result of the block
519  *
520  * Obtains a lock, runs the block, and releases the lock when the block
521  * completes.  See the example under +Mutex+.
522  */
523 static VALUE
rb_mutex_synchronize_m(VALUE self,VALUE args)524 rb_mutex_synchronize_m(VALUE self, VALUE args)
525 {
526     if (!rb_block_given_p()) {
527 	rb_raise(rb_eThreadError, "must be called with a block");
528     }
529 
530     return rb_mutex_synchronize(self, rb_yield, Qundef);
531 }
532 
rb_mutex_allow_trap(VALUE self,int val)533 void rb_mutex_allow_trap(VALUE self, int val)
534 {
535     Check_TypedStruct(self, &mutex_data_type);
536 
537     if (val)
538 	FL_SET_RAW(self, MUTEX_ALLOW_TRAP);
539     else
540 	FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP);
541 }
542 
543 /* Queue */
544 
545 #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
546 PACKED_STRUCT_UNALIGNED(struct rb_queue {
547     struct list_head waitq;
548     rb_serial_t fork_gen;
549     const VALUE que;
550     int num_waiting;
551 });
552 
553 #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
554 #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
555 PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
556     struct rb_queue q;
557     int num_waiting_push;
558     struct list_head pushq;
559     long max;
560 });
561 
562 static void
queue_mark(void * ptr)563 queue_mark(void *ptr)
564 {
565     struct rb_queue *q = ptr;
566 
567     /* no need to mark threads in waitq, they are on stack */
568     rb_gc_mark(q->que);
569 }
570 
571 static size_t
queue_memsize(const void * ptr)572 queue_memsize(const void *ptr)
573 {
574     return sizeof(struct rb_queue);
575 }
576 
577 static const rb_data_type_t queue_data_type = {
578     "queue",
579     {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
580     0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
581 };
582 
583 static VALUE
queue_alloc(VALUE klass)584 queue_alloc(VALUE klass)
585 {
586     VALUE obj;
587     struct rb_queue *q;
588 
589     obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
590     list_head_init(queue_waitq(q));
591     return obj;
592 }
593 
594 static int
queue_fork_check(struct rb_queue * q)595 queue_fork_check(struct rb_queue *q)
596 {
597     rb_serial_t fork_gen = GET_VM()->fork_gen;
598 
599     if (q->fork_gen == fork_gen) {
600         return 0;
601     }
602     /* forked children can't reach into parent thread stacks */
603     q->fork_gen = fork_gen;
604     list_head_init(queue_waitq(q));
605     q->num_waiting = 0;
606     return 1;
607 }
608 
609 static struct rb_queue *
queue_ptr(VALUE obj)610 queue_ptr(VALUE obj)
611 {
612     struct rb_queue *q;
613 
614     TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
615     queue_fork_check(q);
616 
617     return q;
618 }
619 
620 #define QUEUE_CLOSED          FL_USER5
621 
622 static void
szqueue_mark(void * ptr)623 szqueue_mark(void *ptr)
624 {
625     struct rb_szqueue *sq = ptr;
626 
627     queue_mark(&sq->q);
628 }
629 
630 static size_t
szqueue_memsize(const void * ptr)631 szqueue_memsize(const void *ptr)
632 {
633     return sizeof(struct rb_szqueue);
634 }
635 
636 static const rb_data_type_t szqueue_data_type = {
637     "sized_queue",
638     {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
639     0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
640 };
641 
642 static VALUE
szqueue_alloc(VALUE klass)643 szqueue_alloc(VALUE klass)
644 {
645     struct rb_szqueue *sq;
646     VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
647 					&szqueue_data_type, sq);
648     list_head_init(szqueue_waitq(sq));
649     list_head_init(szqueue_pushq(sq));
650     return obj;
651 }
652 
653 static struct rb_szqueue *
szqueue_ptr(VALUE obj)654 szqueue_ptr(VALUE obj)
655 {
656     struct rb_szqueue *sq;
657 
658     TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
659     if (queue_fork_check(&sq->q)) {
660         list_head_init(szqueue_pushq(sq));
661         sq->num_waiting_push = 0;
662     }
663 
664     return sq;
665 }
666 
667 static VALUE
ary_buf_new(void)668 ary_buf_new(void)
669 {
670     return rb_ary_tmp_new(1);
671 }
672 
673 static VALUE
check_array(VALUE obj,VALUE ary)674 check_array(VALUE obj, VALUE ary)
675 {
676     if (!RB_TYPE_P(ary, T_ARRAY)) {
677 	rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
678     }
679     return ary;
680 }
681 
682 static long
queue_length(VALUE self,struct rb_queue * q)683 queue_length(VALUE self, struct rb_queue *q)
684 {
685     return RARRAY_LEN(check_array(self, q->que));
686 }
687 
688 static int
queue_closed_p(VALUE self)689 queue_closed_p(VALUE self)
690 {
691     return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
692 }
693 
694 /*
695  *  Document-class: ClosedQueueError
696  *
697  *  The exception class which will be raised when pushing into a closed
698  *  Queue.  See Queue#close and SizedQueue#close.
699  */
700 
701 NORETURN(static void raise_closed_queue_error(VALUE self));
702 
703 static void
raise_closed_queue_error(VALUE self)704 raise_closed_queue_error(VALUE self)
705 {
706     rb_raise(rb_eClosedQueueError, "queue closed");
707 }
708 
709 static VALUE
queue_closed_result(VALUE self,struct rb_queue * q)710 queue_closed_result(VALUE self, struct rb_queue *q)
711 {
712     assert(queue_length(self, q) == 0);
713     return Qnil;
714 }
715 
716 /*
717  *  Document-class: Queue
718  *
719  *  The Queue class implements multi-producer, multi-consumer queues.
720  *  It is especially useful in threaded programming when information
721  *  must be exchanged safely between multiple threads. The Queue class
722  *  implements all the required locking semantics.
723  *
724  *  The class implements FIFO type of queue. In a FIFO queue, the first
725  *  tasks added are the first retrieved.
726  *
727  *  Example:
728  *
729  *	queue = Queue.new
730  *
731  *	producer = Thread.new do
732  *	  5.times do |i|
733  *	     sleep rand(i) # simulate expense
734  *	     queue << i
735  *	     puts "#{i} produced"
736  *	  end
737  *	end
738  *
739  *	consumer = Thread.new do
740  *	  5.times do |i|
741  *	     value = queue.pop
742  *	     sleep rand(i/2) # simulate expense
743  *	     puts "consumed #{value}"
744  *	  end
745  *	end
746  *
747  *	consumer.join
748  *
749  */
750 
751 /*
752  * Document-method: Queue::new
753  *
754  * Creates a new queue instance.
755  */
756 
757 static VALUE
rb_queue_initialize(VALUE self)758 rb_queue_initialize(VALUE self)
759 {
760     struct rb_queue *q = queue_ptr(self);
761     RB_OBJ_WRITE(self, &q->que, ary_buf_new());
762     list_head_init(queue_waitq(q));
763     return self;
764 }
765 
766 static VALUE
queue_do_push(VALUE self,struct rb_queue * q,VALUE obj)767 queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
768 {
769     if (queue_closed_p(self)) {
770 	raise_closed_queue_error(self);
771     }
772     rb_ary_push(check_array(self, q->que), obj);
773     wakeup_one(queue_waitq(q));
774     return self;
775 }
776 
777 /*
778  * Document-method: Queue#close
779  * call-seq:
780  *   close
781  *
782  * Closes the queue. A closed queue cannot be re-opened.
783  *
784  * After the call to close completes, the following are true:
785  *
786  * - +closed?+ will return true
787  *
788  * - +close+ will be ignored.
789  *
790  * - calling enq/push/<< will raise a +ClosedQueueError+.
791  *
792  * - when +empty?+ is false, calling deq/pop/shift will return an object
793  *   from the queue as usual.
794  * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
795  *   deq(true) will raise a +ThreadError+.
796  *
797  * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
798  *
799  *  Example:
800  *
801  *    	q = Queue.new
802  *      Thread.new{
803  *        while e = q.deq # wait for nil to break loop
804  *          # ...
805  *        end
806  *      }
807  *      q.close
808  */
809 
810 static VALUE
rb_queue_close(VALUE self)811 rb_queue_close(VALUE self)
812 {
813     struct rb_queue *q = queue_ptr(self);
814 
815     if (!queue_closed_p(self)) {
816 	FL_SET(self, QUEUE_CLOSED);
817 
818 	wakeup_all(queue_waitq(q));
819     }
820 
821     return self;
822 }
823 
824 /*
825  * Document-method: Queue#closed?
826  * call-seq: closed?
827  *
828  * Returns +true+ if the queue is closed.
829  */
830 
831 static VALUE
rb_queue_closed_p(VALUE self)832 rb_queue_closed_p(VALUE self)
833 {
834     return queue_closed_p(self) ? Qtrue : Qfalse;
835 }
836 
837 /*
838  * Document-method: Queue#push
839  * call-seq:
840  *   push(object)
841  *   enq(object)
842  *   <<(object)
843  *
844  * Pushes the given +object+ to the queue.
845  */
846 
847 static VALUE
rb_queue_push(VALUE self,VALUE obj)848 rb_queue_push(VALUE self, VALUE obj)
849 {
850     return queue_do_push(self, queue_ptr(self), obj);
851 }
852 
853 static VALUE
queue_sleep(VALUE arg)854 queue_sleep(VALUE arg)
855 {
856     rb_thread_sleep_deadly_allow_spurious_wakeup();
857     return Qnil;
858 }
859 
860 struct queue_waiter {
861     struct sync_waiter w;
862     union {
863 	struct rb_queue *q;
864 	struct rb_szqueue *sq;
865     } as;
866 };
867 
868 static VALUE
queue_sleep_done(VALUE p)869 queue_sleep_done(VALUE p)
870 {
871     struct queue_waiter *qw = (struct queue_waiter *)p;
872 
873     list_del(&qw->w.node);
874     qw->as.q->num_waiting--;
875 
876     return Qfalse;
877 }
878 
879 static VALUE
szqueue_sleep_done(VALUE p)880 szqueue_sleep_done(VALUE p)
881 {
882     struct queue_waiter *qw = (struct queue_waiter *)p;
883 
884     list_del(&qw->w.node);
885     qw->as.sq->num_waiting_push--;
886 
887     return Qfalse;
888 }
889 
890 static VALUE
queue_do_pop(VALUE self,struct rb_queue * q,int should_block)891 queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
892 {
893     check_array(self, q->que);
894 
895     while (RARRAY_LEN(q->que) == 0) {
896 	if (!should_block) {
897 	    rb_raise(rb_eThreadError, "queue empty");
898 	}
899 	else if (queue_closed_p(self)) {
900 	    return queue_closed_result(self, q);
901 	}
902 	else {
903 	    struct queue_waiter qw;
904 
905 	    assert(RARRAY_LEN(q->que) == 0);
906 	    assert(queue_closed_p(self) == 0);
907 
908 	    qw.w.th = GET_THREAD();
909 	    qw.as.q = q;
910 	    list_add_tail(&qw.as.q->waitq, &qw.w.node);
911 	    qw.as.q->num_waiting++;
912 
913 	    rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
914 	}
915     }
916 
917     return rb_ary_shift(q->que);
918 }
919 
920 static int
queue_pop_should_block(int argc,const VALUE * argv)921 queue_pop_should_block(int argc, const VALUE *argv)
922 {
923     int should_block = 1;
924     rb_check_arity(argc, 0, 1);
925     if (argc > 0) {
926 	should_block = !RTEST(argv[0]);
927     }
928     return should_block;
929 }
930 
931 /*
932  * Document-method: Queue#pop
933  * call-seq:
934  *   pop(non_block=false)
935  *   deq(non_block=false)
936  *   shift(non_block=false)
937  *
938  * Retrieves data from the queue.
939  *
940  * If the queue is empty, the calling thread is suspended until data is pushed
941  * onto the queue. If +non_block+ is true, the thread isn't suspended, and
942  * +ThreadError+ is raised.
943  */
944 
945 static VALUE
rb_queue_pop(int argc,VALUE * argv,VALUE self)946 rb_queue_pop(int argc, VALUE *argv, VALUE self)
947 {
948     int should_block = queue_pop_should_block(argc, argv);
949     return queue_do_pop(self, queue_ptr(self), should_block);
950 }
951 
952 /*
953  * Document-method: Queue#empty?
954  * call-seq: empty?
955  *
956  * Returns +true+ if the queue is empty.
957  */
958 
959 static VALUE
rb_queue_empty_p(VALUE self)960 rb_queue_empty_p(VALUE self)
961 {
962     return queue_length(self, queue_ptr(self)) == 0 ? Qtrue : Qfalse;
963 }
964 
965 /*
966  * Document-method: Queue#clear
967  *
968  * Removes all objects from the queue.
969  */
970 
971 static VALUE
rb_queue_clear(VALUE self)972 rb_queue_clear(VALUE self)
973 {
974     struct rb_queue *q = queue_ptr(self);
975 
976     rb_ary_clear(check_array(self, q->que));
977     return self;
978 }
979 
980 /*
981  * Document-method: Queue#length
982  * call-seq:
983  *   length
984  *   size
985  *
986  * Returns the length of the queue.
987  */
988 
989 static VALUE
rb_queue_length(VALUE self)990 rb_queue_length(VALUE self)
991 {
992     return LONG2NUM(queue_length(self, queue_ptr(self)));
993 }
994 
995 /*
996  * Document-method: Queue#num_waiting
997  *
998  * Returns the number of threads waiting on the queue.
999  */
1000 
1001 static VALUE
rb_queue_num_waiting(VALUE self)1002 rb_queue_num_waiting(VALUE self)
1003 {
1004     struct rb_queue *q = queue_ptr(self);
1005 
1006     return INT2NUM(q->num_waiting);
1007 }
1008 
1009 /*
1010  *  Document-class: SizedQueue
1011  *
1012  * This class represents queues of specified size capacity.  The push operation
1013  * may be blocked if the capacity is full.
1014  *
1015  * See Queue for an example of how a SizedQueue works.
1016  */
1017 
1018 /*
1019  * Document-method: SizedQueue::new
1020  * call-seq: new(max)
1021  *
1022  * Creates a fixed-length queue with a maximum size of +max+.
1023  */
1024 
1025 static VALUE
rb_szqueue_initialize(VALUE self,VALUE vmax)1026 rb_szqueue_initialize(VALUE self, VALUE vmax)
1027 {
1028     long max;
1029     struct rb_szqueue *sq = szqueue_ptr(self);
1030 
1031     max = NUM2LONG(vmax);
1032     if (max <= 0) {
1033 	rb_raise(rb_eArgError, "queue size must be positive");
1034     }
1035 
1036     RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
1037     list_head_init(szqueue_waitq(sq));
1038     list_head_init(szqueue_pushq(sq));
1039     sq->max = max;
1040 
1041     return self;
1042 }
1043 
1044 /*
1045  * Document-method: SizedQueue#close
1046  * call-seq:
1047  *   close
1048  *
1049  * Similar to Queue#close.
1050  *
1051  * The difference is behavior with waiting enqueuing threads.
1052  *
1053  * If there are waiting enqueuing threads, they are interrupted by
1054  * raising ClosedQueueError('queue closed').
1055  */
1056 static VALUE
rb_szqueue_close(VALUE self)1057 rb_szqueue_close(VALUE self)
1058 {
1059     if (!queue_closed_p(self)) {
1060 	struct rb_szqueue *sq = szqueue_ptr(self);
1061 
1062 	FL_SET(self, QUEUE_CLOSED);
1063 	wakeup_all(szqueue_waitq(sq));
1064 	wakeup_all(szqueue_pushq(sq));
1065     }
1066     return self;
1067 }
1068 
1069 /*
1070  * Document-method: SizedQueue#max
1071  *
1072  * Returns the maximum size of the queue.
1073  */
1074 
1075 static VALUE
rb_szqueue_max_get(VALUE self)1076 rb_szqueue_max_get(VALUE self)
1077 {
1078     return LONG2NUM(szqueue_ptr(self)->max);
1079 }
1080 
1081 /*
1082  * Document-method: SizedQueue#max=
1083  * call-seq: max=(number)
1084  *
1085  * Sets the maximum size of the queue to the given +number+.
1086  */
1087 
1088 static VALUE
rb_szqueue_max_set(VALUE self,VALUE vmax)1089 rb_szqueue_max_set(VALUE self, VALUE vmax)
1090 {
1091     long max = NUM2LONG(vmax);
1092     long diff = 0;
1093     struct rb_szqueue *sq = szqueue_ptr(self);
1094 
1095     if (max <= 0) {
1096 	rb_raise(rb_eArgError, "queue size must be positive");
1097     }
1098     if (max > sq->max) {
1099 	diff = max - sq->max;
1100     }
1101     sq->max = max;
1102     sync_wakeup(szqueue_pushq(sq), diff);
1103     return vmax;
1104 }
1105 
1106 static int
szqueue_push_should_block(int argc,const VALUE * argv)1107 szqueue_push_should_block(int argc, const VALUE *argv)
1108 {
1109     int should_block = 1;
1110     rb_check_arity(argc, 1, 2);
1111     if (argc > 1) {
1112 	should_block = !RTEST(argv[1]);
1113     }
1114     return should_block;
1115 }
1116 
1117 /*
1118  * Document-method: SizedQueue#push
1119  * call-seq:
1120  *   push(object, non_block=false)
1121  *   enq(object, non_block=false)
1122  *   <<(object)
1123  *
1124  * Pushes +object+ to the queue.
1125  *
1126  * If there is no space left in the queue, waits until space becomes
1127  * available, unless +non_block+ is true.  If +non_block+ is true, the
1128  * thread isn't suspended, and +ThreadError+ is raised.
1129  */
1130 
1131 static VALUE
rb_szqueue_push(int argc,VALUE * argv,VALUE self)1132 rb_szqueue_push(int argc, VALUE *argv, VALUE self)
1133 {
1134     struct rb_szqueue *sq = szqueue_ptr(self);
1135     int should_block = szqueue_push_should_block(argc, argv);
1136 
1137     while (queue_length(self, &sq->q) >= sq->max) {
1138 	if (!should_block) {
1139 	    rb_raise(rb_eThreadError, "queue full");
1140 	}
1141 	else if (queue_closed_p(self)) {
1142 	    goto closed;
1143 	}
1144 	else {
1145 	    struct queue_waiter qw;
1146 	    struct list_head *pushq = szqueue_pushq(sq);
1147 
1148 	    qw.w.th = GET_THREAD();
1149 	    qw.as.sq = sq;
1150 	    list_add_tail(pushq, &qw.w.node);
1151 	    sq->num_waiting_push++;
1152 
1153 	    rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
1154 	}
1155     }
1156 
1157     if (queue_closed_p(self)) {
1158       closed:
1159 	raise_closed_queue_error(self);
1160     }
1161 
1162     return queue_do_push(self, &sq->q, argv[0]);
1163 }
1164 
1165 static VALUE
szqueue_do_pop(VALUE self,int should_block)1166 szqueue_do_pop(VALUE self, int should_block)
1167 {
1168     struct rb_szqueue *sq = szqueue_ptr(self);
1169     VALUE retval = queue_do_pop(self, &sq->q, should_block);
1170 
1171     if (queue_length(self, &sq->q) < sq->max) {
1172 	wakeup_one(szqueue_pushq(sq));
1173     }
1174 
1175     return retval;
1176 }
1177 
1178 /*
1179  * Document-method: SizedQueue#pop
1180  * call-seq:
1181  *   pop(non_block=false)
1182  *   deq(non_block=false)
1183  *   shift(non_block=false)
1184  *
1185  * Retrieves data from the queue.
1186  *
1187  * If the queue is empty, the calling thread is suspended until data is pushed
1188  * onto the queue. If +non_block+ is true, the thread isn't suspended, and
1189  * +ThreadError+ is raised.
1190  */
1191 
1192 static VALUE
rb_szqueue_pop(int argc,VALUE * argv,VALUE self)1193 rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
1194 {
1195     int should_block = queue_pop_should_block(argc, argv);
1196     return szqueue_do_pop(self, should_block);
1197 }
1198 
1199 /*
1200  * Document-method: SizedQueue#clear
1201  *
1202  * Removes all objects from the queue.
1203  */
1204 
1205 static VALUE
rb_szqueue_clear(VALUE self)1206 rb_szqueue_clear(VALUE self)
1207 {
1208     struct rb_szqueue *sq = szqueue_ptr(self);
1209 
1210     rb_ary_clear(check_array(self, sq->q.que));
1211     wakeup_all(szqueue_pushq(sq));
1212     return self;
1213 }
1214 
1215 /*
1216  * Document-method: SizedQueue#length
1217  * call-seq:
1218  *   length
1219  *   size
1220  *
1221  * Returns the length of the queue.
1222  */
1223 
1224 static VALUE
rb_szqueue_length(VALUE self)1225 rb_szqueue_length(VALUE self)
1226 {
1227     struct rb_szqueue *sq = szqueue_ptr(self);
1228 
1229     return LONG2NUM(queue_length(self, &sq->q));
1230 }
1231 
1232 /*
1233  * Document-method: SizedQueue#num_waiting
1234  *
1235  * Returns the number of threads waiting on the queue.
1236  */
1237 
1238 static VALUE
rb_szqueue_num_waiting(VALUE self)1239 rb_szqueue_num_waiting(VALUE self)
1240 {
1241     struct rb_szqueue *sq = szqueue_ptr(self);
1242 
1243     return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1244 }
1245 
1246 /*
1247  * Document-method: SizedQueue#empty?
1248  * call-seq: empty?
1249  *
1250  * Returns +true+ if the queue is empty.
1251  */
1252 
1253 static VALUE
rb_szqueue_empty_p(VALUE self)1254 rb_szqueue_empty_p(VALUE self)
1255 {
1256     struct rb_szqueue *sq = szqueue_ptr(self);
1257 
1258     return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse;
1259 }
1260 
1261 
1262 /* ConditionalVariable */
1263 struct rb_condvar {
1264     struct list_head waitq;
1265     rb_serial_t fork_gen;
1266 };
1267 
1268 /*
1269  *  Document-class: ConditionVariable
1270  *
1271  *  ConditionVariable objects augment class Mutex. Using condition variables,
1272  *  it is possible to suspend while in the middle of a critical section until a
1273  *  resource becomes available.
1274  *
1275  *  Example:
1276  *
1277  *    mutex = Mutex.new
1278  *    resource = ConditionVariable.new
1279  *
1280  *    a = Thread.new {
1281  *	 mutex.synchronize {
1282  *	   # Thread 'a' now needs the resource
1283  *	   resource.wait(mutex)
1284  *	   # 'a' can now have the resource
1285  *	 }
1286  *    }
1287  *
1288  *    b = Thread.new {
1289  *	 mutex.synchronize {
1290  *	   # Thread 'b' has finished using the resource
1291  *	   resource.signal
1292  *	 }
1293  *    }
1294  */
1295 
1296 static size_t
condvar_memsize(const void * ptr)1297 condvar_memsize(const void *ptr)
1298 {
1299     return sizeof(struct rb_condvar);
1300 }
1301 
1302 static const rb_data_type_t cv_data_type = {
1303     "condvar",
1304     {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
1305     0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
1306 };
1307 
1308 static struct rb_condvar *
condvar_ptr(VALUE self)1309 condvar_ptr(VALUE self)
1310 {
1311     struct rb_condvar *cv;
1312     rb_serial_t fork_gen = GET_VM()->fork_gen;
1313 
1314     TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
1315 
1316     /* forked children can't reach into parent thread stacks */
1317     if (cv->fork_gen != fork_gen) {
1318         cv->fork_gen = fork_gen;
1319         list_head_init(&cv->waitq);
1320     }
1321 
1322     return cv;
1323 }
1324 
1325 static VALUE
condvar_alloc(VALUE klass)1326 condvar_alloc(VALUE klass)
1327 {
1328     struct rb_condvar *cv;
1329     VALUE obj;
1330 
1331     obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
1332     list_head_init(&cv->waitq);
1333 
1334     return obj;
1335 }
1336 
1337 /*
1338  * Document-method: ConditionVariable::new
1339  *
1340  * Creates a new condition variable instance.
1341  */
1342 
1343 static VALUE
rb_condvar_initialize(VALUE self)1344 rb_condvar_initialize(VALUE self)
1345 {
1346     struct rb_condvar *cv = condvar_ptr(self);
1347     list_head_init(&cv->waitq);
1348     return self;
1349 }
1350 
1351 struct sleep_call {
1352     VALUE mutex;
1353     VALUE timeout;
1354 };
1355 
1356 static ID id_sleep;
1357 
1358 static VALUE
do_sleep(VALUE args)1359 do_sleep(VALUE args)
1360 {
1361     struct sleep_call *p = (struct sleep_call *)args;
1362     return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1363 }
1364 
1365 static VALUE
delete_from_waitq(struct sync_waiter * w)1366 delete_from_waitq(struct sync_waiter *w)
1367 {
1368     list_del(&w->node);
1369 
1370     return Qnil;
1371 }
1372 
1373 /*
1374  * Document-method: ConditionVariable#wait
1375  * call-seq: wait(mutex, timeout=nil)
1376  *
1377  * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
1378  *
1379  * If +timeout+ is given, this method returns after +timeout+ seconds passed,
1380  * even if no other thread doesn't signal.
1381  */
1382 
1383 static VALUE
rb_condvar_wait(int argc,VALUE * argv,VALUE self)1384 rb_condvar_wait(int argc, VALUE *argv, VALUE self)
1385 {
1386     struct rb_condvar *cv = condvar_ptr(self);
1387     struct sleep_call args;
1388     struct sync_waiter w;
1389 
1390     rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
1391 
1392     w.th = GET_THREAD();
1393     list_add_tail(&cv->waitq, &w.node);
1394     rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);
1395 
1396     return self;
1397 }
1398 
1399 /*
1400  * Document-method: ConditionVariable#signal
1401  *
1402  * Wakes up the first thread in line waiting for this lock.
1403  */
1404 
1405 static VALUE
rb_condvar_signal(VALUE self)1406 rb_condvar_signal(VALUE self)
1407 {
1408     struct rb_condvar *cv = condvar_ptr(self);
1409     wakeup_one(&cv->waitq);
1410     return self;
1411 }
1412 
1413 /*
1414  * Document-method: ConditionVariable#broadcast
1415  *
1416  * Wakes up all threads waiting for this lock.
1417  */
1418 
1419 static VALUE
rb_condvar_broadcast(VALUE self)1420 rb_condvar_broadcast(VALUE self)
1421 {
1422     struct rb_condvar *cv = condvar_ptr(self);
1423     wakeup_all(&cv->waitq);
1424     return self;
1425 }
1426 
1427 /* :nodoc: */
1428 static VALUE
undumpable(VALUE obj)1429 undumpable(VALUE obj)
1430 {
1431     rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
1432     UNREACHABLE_RETURN(Qnil);
1433 }
1434 
1435 static VALUE
define_thread_class(VALUE outer,const char * name,VALUE super)1436 define_thread_class(VALUE outer, const char *name, VALUE super)
1437 {
1438     VALUE klass = rb_define_class_under(outer, name, super);
1439     rb_define_const(rb_cObject, name, klass);
1440     return klass;
1441 }
1442 
1443 static void
Init_thread_sync(void)1444 Init_thread_sync(void)
1445 {
1446 #undef rb_intern
1447 #if 0
1448     rb_cMutex = rb_define_class("Mutex", rb_cObject); /* teach rdoc Mutex */
1449     rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
1450     rb_cQueue = rb_define_class("Queue", rb_cObject); /* teach rdoc Queue */
1451     rb_cSizedQueue = rb_define_class("SizedQueue", rb_cObject); /* teach rdoc SizedQueue */
1452 #endif
1453 
1454 #define DEFINE_CLASS(name, super) \
1455     rb_c##name = define_thread_class(rb_cThread, #name, rb_c##super)
1456 
1457     /* Mutex */
1458     DEFINE_CLASS(Mutex, Object);
1459     rb_define_alloc_func(rb_cMutex, mutex_alloc);
1460     rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
1461     rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
1462     rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
1463     rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
1464     rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
1465     rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
1466     rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
1467     rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
1468 
1469     /* Queue */
1470     DEFINE_CLASS(Queue, Object);
1471     rb_define_alloc_func(rb_cQueue, queue_alloc);
1472 
1473     rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
1474 
1475     rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
1476     rb_undef_method(rb_cQueue, "initialize_copy");
1477     rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
1478     rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
1479     rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
1480     rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
1481     rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
1482     rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
1483     rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
1484     rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
1485     rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
1486 
1487     rb_define_alias(rb_cQueue, "enq", "push");
1488     rb_define_alias(rb_cQueue, "<<", "push");
1489     rb_define_alias(rb_cQueue, "deq", "pop");
1490     rb_define_alias(rb_cQueue, "shift", "pop");
1491     rb_define_alias(rb_cQueue, "size", "length");
1492 
1493     DEFINE_CLASS(SizedQueue, Queue);
1494     rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
1495 
1496     rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
1497     rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
1498     rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
1499     rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
1500     rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
1501     rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
1502     rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
1503     rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
1504     rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
1505     rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
1506 
1507     rb_define_alias(rb_cSizedQueue, "enq", "push");
1508     rb_define_alias(rb_cSizedQueue, "<<", "push");
1509     rb_define_alias(rb_cSizedQueue, "deq", "pop");
1510     rb_define_alias(rb_cSizedQueue, "shift", "pop");
1511     rb_define_alias(rb_cSizedQueue, "size", "length");
1512 
1513     /* CVar */
1514     DEFINE_CLASS(ConditionVariable, Object);
1515     rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
1516 
1517     id_sleep = rb_intern("sleep");
1518 
1519     rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
1520     rb_undef_method(rb_cConditionVariable, "initialize_copy");
1521     rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
1522     rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
1523     rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
1524     rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
1525 
1526     rb_provide("thread.rb");
1527 }
1528