1 /* included by thread.c */
2 #include "ccan/list/list.h"
3
4 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
5 static VALUE rb_eClosedQueueError;
6
7 /* sync_waiter is always on-stack */
8 struct sync_waiter {
9 rb_thread_t *th;
10 struct list_node node;
11 };
12
13 #define MUTEX_ALLOW_TRAP FL_USER1
14
15 static void
sync_wakeup(struct list_head * head,long max)16 sync_wakeup(struct list_head *head, long max)
17 {
18 struct sync_waiter *cur = 0, *next;
19
20 list_for_each_safe(head, cur, next, node) {
21 list_del_init(&cur->node);
22 if (cur->th->status != THREAD_KILLED) {
23 rb_threadptr_interrupt(cur->th);
24 cur->th->status = THREAD_RUNNABLE;
25 if (--max == 0) return;
26 }
27 }
28 }
29
30 static void
wakeup_one(struct list_head * head)31 wakeup_one(struct list_head *head)
32 {
33 sync_wakeup(head, 1);
34 }
35
36 static void
wakeup_all(struct list_head * head)37 wakeup_all(struct list_head *head)
38 {
39 sync_wakeup(head, LONG_MAX);
40 }
41
42 /* Mutex */
43
44 typedef struct rb_mutex_struct {
45 rb_thread_t *th;
46 struct rb_mutex_struct *next_mutex;
47 struct list_head waitq; /* protected by GVL */
48 } rb_mutex_t;
49
50 #if defined(HAVE_WORKING_FORK)
51 static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
52 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
53 static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
54 #endif
55 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th);
56
57 /*
58 * Document-class: Mutex
59 *
60 * Mutex implements a simple semaphore that can be used to coordinate access to
61 * shared data from multiple concurrent threads.
62 *
63 * Example:
64 *
65 * semaphore = Mutex.new
66 *
67 * a = Thread.new {
68 * semaphore.synchronize {
69 * # access shared resource
70 * }
71 * }
72 *
73 * b = Thread.new {
74 * semaphore.synchronize {
75 * # access shared resource
76 * }
77 * }
78 *
79 */
80
81 #define mutex_mark NULL
82
83 static size_t
rb_mutex_num_waiting(rb_mutex_t * mutex)84 rb_mutex_num_waiting(rb_mutex_t *mutex)
85 {
86 struct sync_waiter *w = 0;
87 size_t n = 0;
88
89 list_for_each(&mutex->waitq, w, node) {
90 n++;
91 }
92
93 return n;
94 }
95
96 static void
mutex_free(void * ptr)97 mutex_free(void *ptr)
98 {
99 rb_mutex_t *mutex = ptr;
100 if (mutex->th) {
101 /* rb_warn("free locked mutex"); */
102 const char *err = rb_mutex_unlock_th(mutex, mutex->th);
103 if (err) rb_bug("%s", err);
104 }
105 ruby_xfree(ptr);
106 }
107
108 static size_t
mutex_memsize(const void * ptr)109 mutex_memsize(const void *ptr)
110 {
111 return sizeof(rb_mutex_t);
112 }
113
114 static const rb_data_type_t mutex_data_type = {
115 "mutex",
116 {mutex_mark, mutex_free, mutex_memsize,},
117 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
118 };
119
120 static rb_mutex_t *
mutex_ptr(VALUE obj)121 mutex_ptr(VALUE obj)
122 {
123 rb_mutex_t *mutex;
124
125 TypedData_Get_Struct(obj, rb_mutex_t, &mutex_data_type, mutex);
126
127 return mutex;
128 }
129
130 VALUE
rb_obj_is_mutex(VALUE obj)131 rb_obj_is_mutex(VALUE obj)
132 {
133 if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) {
134 return Qtrue;
135 }
136 else {
137 return Qfalse;
138 }
139 }
140
141 static VALUE
mutex_alloc(VALUE klass)142 mutex_alloc(VALUE klass)
143 {
144 VALUE obj;
145 rb_mutex_t *mutex;
146
147 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
148 list_head_init(&mutex->waitq);
149 return obj;
150 }
151
152 /*
153 * call-seq:
154 * Mutex.new -> mutex
155 *
156 * Creates a new Mutex
157 */
158 static VALUE
mutex_initialize(VALUE self)159 mutex_initialize(VALUE self)
160 {
161 return self;
162 }
163
164 VALUE
rb_mutex_new(void)165 rb_mutex_new(void)
166 {
167 return mutex_alloc(rb_cMutex);
168 }
169
170 /*
171 * call-seq:
172 * mutex.locked? -> true or false
173 *
174 * Returns +true+ if this lock is currently held by some thread.
175 */
176 VALUE
rb_mutex_locked_p(VALUE self)177 rb_mutex_locked_p(VALUE self)
178 {
179 rb_mutex_t *mutex = mutex_ptr(self);
180
181 return mutex->th ? Qtrue : Qfalse;
182 }
183
184 static void
mutex_locked(rb_thread_t * th,VALUE self)185 mutex_locked(rb_thread_t *th, VALUE self)
186 {
187 rb_mutex_t *mutex = mutex_ptr(self);
188
189 if (th->keeping_mutexes) {
190 mutex->next_mutex = th->keeping_mutexes;
191 }
192 th->keeping_mutexes = mutex;
193 }
194
195 /*
196 * call-seq:
197 * mutex.try_lock -> true or false
198 *
199 * Attempts to obtain the lock and returns immediately. Returns +true+ if the
200 * lock was granted.
201 */
202 VALUE
rb_mutex_trylock(VALUE self)203 rb_mutex_trylock(VALUE self)
204 {
205 rb_mutex_t *mutex = mutex_ptr(self);
206 VALUE locked = Qfalse;
207
208 if (mutex->th == 0) {
209 rb_thread_t *th = GET_THREAD();
210 mutex->th = th;
211 locked = Qtrue;
212
213 mutex_locked(th, self);
214 }
215
216 return locked;
217 }
218
219 /*
220 * At maximum, only one thread can use cond_timedwait and watch deadlock
221 * periodically. Multiple polling thread (i.e. concurrent deadlock check)
222 * introduces new race conditions. [Bug #6278] [ruby-core:44275]
223 */
224 static const rb_thread_t *patrol_thread = NULL;
225
226 static VALUE
do_mutex_lock(VALUE self,int interruptible_p)227 do_mutex_lock(VALUE self, int interruptible_p)
228 {
229 rb_thread_t *th = GET_THREAD();
230 rb_mutex_t *mutex = mutex_ptr(self);
231
232 /* When running trap handler */
233 if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
234 th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
235 rb_raise(rb_eThreadError, "can't be called from trap context");
236 }
237
238 if (rb_mutex_trylock(self) == Qfalse) {
239 struct sync_waiter w;
240
241 if (mutex->th == th) {
242 rb_raise(rb_eThreadError, "deadlock; recursive locking");
243 }
244
245 w.th = th;
246
247 while (mutex->th != th) {
248 enum rb_thread_status prev_status = th->status;
249 rb_hrtime_t *timeout = 0;
250 rb_hrtime_t rel = rb_msec2hrtime(100);
251
252 th->status = THREAD_STOPPED_FOREVER;
253 th->locking_mutex = self;
254 th->vm->sleeper++;
255 /*
256 * Carefully! while some contended threads are in native_sleep(),
257 * vm->sleeper is unstable value. we have to avoid both deadlock
258 * and busy loop.
259 */
260 if ((vm_living_thread_num(th->vm) == th->vm->sleeper) &&
261 !patrol_thread) {
262 timeout = &rel;
263 patrol_thread = th;
264 }
265
266 list_add_tail(&mutex->waitq, &w.node);
267 native_sleep(th, timeout); /* release GVL */
268 list_del(&w.node);
269
270 if (!mutex->th) {
271 mutex->th = th;
272 }
273
274 if (patrol_thread == th)
275 patrol_thread = NULL;
276
277 th->locking_mutex = Qfalse;
278 if (mutex->th && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
279 rb_check_deadlock(th->vm);
280 }
281 if (th->status == THREAD_STOPPED_FOREVER) {
282 th->status = prev_status;
283 }
284 th->vm->sleeper--;
285
286 if (interruptible_p) {
287 /* release mutex before checking for interrupts...as interrupt checking
288 * code might call rb_raise() */
289 if (mutex->th == th) mutex->th = 0;
290 RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
291 if (!mutex->th) {
292 mutex->th = th;
293 mutex_locked(th, self);
294 }
295 } else {
296 if (mutex->th == th) mutex_locked(th, self);
297 }
298 }
299 }
300 return self;
301 }
302
303 static VALUE
mutex_lock_uninterruptible(VALUE self)304 mutex_lock_uninterruptible(VALUE self)
305 {
306 return do_mutex_lock(self, 0);
307 }
308
309 /*
310 * call-seq:
311 * mutex.lock -> self
312 *
313 * Attempts to grab the lock and waits if it isn't available.
314 * Raises +ThreadError+ if +mutex+ was locked by the current thread.
315 */
316 VALUE
rb_mutex_lock(VALUE self)317 rb_mutex_lock(VALUE self)
318 {
319 return do_mutex_lock(self, 1);
320 }
321
322 /*
323 * call-seq:
324 * mutex.owned? -> true or false
325 *
326 * Returns +true+ if this lock is currently held by current thread.
327 */
328 VALUE
rb_mutex_owned_p(VALUE self)329 rb_mutex_owned_p(VALUE self)
330 {
331 VALUE owned = Qfalse;
332 rb_thread_t *th = GET_THREAD();
333 rb_mutex_t *mutex = mutex_ptr(self);
334
335 if (mutex->th == th)
336 owned = Qtrue;
337
338 return owned;
339 }
340
341 static const char *
rb_mutex_unlock_th(rb_mutex_t * mutex,rb_thread_t * th)342 rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th)
343 {
344 const char *err = NULL;
345
346 if (mutex->th == 0) {
347 err = "Attempt to unlock a mutex which is not locked";
348 }
349 else if (mutex->th != th) {
350 err = "Attempt to unlock a mutex which is locked by another thread";
351 }
352 else {
353 struct sync_waiter *cur = 0, *next;
354 rb_mutex_t **th_mutex = &th->keeping_mutexes;
355
356 mutex->th = 0;
357 list_for_each_safe(&mutex->waitq, cur, next, node) {
358 list_del_init(&cur->node);
359 switch (cur->th->status) {
360 case THREAD_RUNNABLE: /* from someone else calling Thread#run */
361 case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
362 rb_threadptr_interrupt(cur->th);
363 goto found;
364 case THREAD_STOPPED: /* probably impossible */
365 rb_bug("unexpected THREAD_STOPPED");
366 case THREAD_KILLED:
367 /* not sure about this, possible in exit GC? */
368 rb_bug("unexpected THREAD_KILLED");
369 continue;
370 }
371 }
372 found:
373 while (*th_mutex != mutex) {
374 th_mutex = &(*th_mutex)->next_mutex;
375 }
376 *th_mutex = mutex->next_mutex;
377 mutex->next_mutex = NULL;
378 }
379
380 return err;
381 }
382
383 /*
384 * call-seq:
385 * mutex.unlock -> self
386 *
387 * Releases the lock.
388 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
389 */
390 VALUE
rb_mutex_unlock(VALUE self)391 rb_mutex_unlock(VALUE self)
392 {
393 const char *err;
394 rb_mutex_t *mutex = mutex_ptr(self);
395
396 err = rb_mutex_unlock_th(mutex, GET_THREAD());
397 if (err) rb_raise(rb_eThreadError, "%s", err);
398
399 return self;
400 }
401
402 #if defined(HAVE_WORKING_FORK)
403 static void
rb_mutex_abandon_keeping_mutexes(rb_thread_t * th)404 rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
405 {
406 rb_mutex_abandon_all(th->keeping_mutexes);
407 th->keeping_mutexes = NULL;
408 }
409
410 static void
rb_mutex_abandon_locking_mutex(rb_thread_t * th)411 rb_mutex_abandon_locking_mutex(rb_thread_t *th)
412 {
413 if (th->locking_mutex) {
414 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
415
416 list_head_init(&mutex->waitq);
417 th->locking_mutex = Qfalse;
418 }
419 }
420
421 static void
rb_mutex_abandon_all(rb_mutex_t * mutexes)422 rb_mutex_abandon_all(rb_mutex_t *mutexes)
423 {
424 rb_mutex_t *mutex;
425
426 while (mutexes) {
427 mutex = mutexes;
428 mutexes = mutex->next_mutex;
429 mutex->th = 0;
430 mutex->next_mutex = 0;
431 list_head_init(&mutex->waitq);
432 }
433 }
434 #endif
435
436 static VALUE
rb_mutex_sleep_forever(VALUE time)437 rb_mutex_sleep_forever(VALUE time)
438 {
439 rb_thread_sleep_deadly_allow_spurious_wakeup();
440 return Qnil;
441 }
442
443 static VALUE
rb_mutex_wait_for(VALUE time)444 rb_mutex_wait_for(VALUE time)
445 {
446 rb_hrtime_t *rel = (rb_hrtime_t *)time;
447 /* permit spurious check */
448 sleep_hrtime(GET_THREAD(), *rel, 0);
449 return Qnil;
450 }
451
452 VALUE
rb_mutex_sleep(VALUE self,VALUE timeout)453 rb_mutex_sleep(VALUE self, VALUE timeout)
454 {
455 time_t beg, end;
456 struct timeval t;
457
458 if (!NIL_P(timeout)) {
459 t = rb_time_interval(timeout);
460 }
461
462 rb_mutex_unlock(self);
463 beg = time(0);
464 if (NIL_P(timeout)) {
465 rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self);
466 }
467 else {
468 rb_hrtime_t rel = rb_timeval2hrtime(&t);
469
470 rb_ensure(rb_mutex_wait_for, (VALUE)&rel,
471 mutex_lock_uninterruptible, self);
472 }
473 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
474 end = time(0) - beg;
475 return INT2FIX(end);
476 }
477
478 /*
479 * call-seq:
480 * mutex.sleep(timeout = nil) -> number
481 *
482 * Releases the lock and sleeps +timeout+ seconds if it is given and
483 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
484 * the current thread.
485 *
486 * When the thread is next woken up, it will attempt to reacquire
487 * the lock.
488 *
489 * Note that this method can wakeup without explicit Thread#wakeup call.
490 * For example, receiving signal and so on.
491 */
492 static VALUE
mutex_sleep(int argc,VALUE * argv,VALUE self)493 mutex_sleep(int argc, VALUE *argv, VALUE self)
494 {
495 VALUE timeout;
496
497 timeout = rb_check_arity(argc, 0, 1) ? argv[0] : Qnil;
498 return rb_mutex_sleep(self, timeout);
499 }
500
501 /*
502 * call-seq:
503 * mutex.synchronize { ... } -> result of the block
504 *
505 * Obtains a lock, runs the block, and releases the lock when the block
506 * completes. See the example under +Mutex+.
507 */
508
509 VALUE
rb_mutex_synchronize(VALUE mutex,VALUE (* func)(VALUE arg),VALUE arg)510 rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
511 {
512 rb_mutex_lock(mutex);
513 return rb_ensure(func, arg, rb_mutex_unlock, mutex);
514 }
515
516 /*
517 * call-seq:
518 * mutex.synchronize { ... } -> result of the block
519 *
520 * Obtains a lock, runs the block, and releases the lock when the block
521 * completes. See the example under +Mutex+.
522 */
523 static VALUE
rb_mutex_synchronize_m(VALUE self,VALUE args)524 rb_mutex_synchronize_m(VALUE self, VALUE args)
525 {
526 if (!rb_block_given_p()) {
527 rb_raise(rb_eThreadError, "must be called with a block");
528 }
529
530 return rb_mutex_synchronize(self, rb_yield, Qundef);
531 }
532
rb_mutex_allow_trap(VALUE self,int val)533 void rb_mutex_allow_trap(VALUE self, int val)
534 {
535 Check_TypedStruct(self, &mutex_data_type);
536
537 if (val)
538 FL_SET_RAW(self, MUTEX_ALLOW_TRAP);
539 else
540 FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP);
541 }
542
543 /* Queue */
544
545 #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
546 PACKED_STRUCT_UNALIGNED(struct rb_queue {
547 struct list_head waitq;
548 rb_serial_t fork_gen;
549 const VALUE que;
550 int num_waiting;
551 });
552
553 #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
554 #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
555 PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
556 struct rb_queue q;
557 int num_waiting_push;
558 struct list_head pushq;
559 long max;
560 });
561
562 static void
queue_mark(void * ptr)563 queue_mark(void *ptr)
564 {
565 struct rb_queue *q = ptr;
566
567 /* no need to mark threads in waitq, they are on stack */
568 rb_gc_mark(q->que);
569 }
570
571 static size_t
queue_memsize(const void * ptr)572 queue_memsize(const void *ptr)
573 {
574 return sizeof(struct rb_queue);
575 }
576
577 static const rb_data_type_t queue_data_type = {
578 "queue",
579 {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
580 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
581 };
582
583 static VALUE
queue_alloc(VALUE klass)584 queue_alloc(VALUE klass)
585 {
586 VALUE obj;
587 struct rb_queue *q;
588
589 obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
590 list_head_init(queue_waitq(q));
591 return obj;
592 }
593
594 static int
queue_fork_check(struct rb_queue * q)595 queue_fork_check(struct rb_queue *q)
596 {
597 rb_serial_t fork_gen = GET_VM()->fork_gen;
598
599 if (q->fork_gen == fork_gen) {
600 return 0;
601 }
602 /* forked children can't reach into parent thread stacks */
603 q->fork_gen = fork_gen;
604 list_head_init(queue_waitq(q));
605 q->num_waiting = 0;
606 return 1;
607 }
608
609 static struct rb_queue *
queue_ptr(VALUE obj)610 queue_ptr(VALUE obj)
611 {
612 struct rb_queue *q;
613
614 TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
615 queue_fork_check(q);
616
617 return q;
618 }
619
620 #define QUEUE_CLOSED FL_USER5
621
622 static void
szqueue_mark(void * ptr)623 szqueue_mark(void *ptr)
624 {
625 struct rb_szqueue *sq = ptr;
626
627 queue_mark(&sq->q);
628 }
629
630 static size_t
szqueue_memsize(const void * ptr)631 szqueue_memsize(const void *ptr)
632 {
633 return sizeof(struct rb_szqueue);
634 }
635
636 static const rb_data_type_t szqueue_data_type = {
637 "sized_queue",
638 {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
639 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
640 };
641
642 static VALUE
szqueue_alloc(VALUE klass)643 szqueue_alloc(VALUE klass)
644 {
645 struct rb_szqueue *sq;
646 VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
647 &szqueue_data_type, sq);
648 list_head_init(szqueue_waitq(sq));
649 list_head_init(szqueue_pushq(sq));
650 return obj;
651 }
652
653 static struct rb_szqueue *
szqueue_ptr(VALUE obj)654 szqueue_ptr(VALUE obj)
655 {
656 struct rb_szqueue *sq;
657
658 TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
659 if (queue_fork_check(&sq->q)) {
660 list_head_init(szqueue_pushq(sq));
661 sq->num_waiting_push = 0;
662 }
663
664 return sq;
665 }
666
667 static VALUE
ary_buf_new(void)668 ary_buf_new(void)
669 {
670 return rb_ary_tmp_new(1);
671 }
672
673 static VALUE
check_array(VALUE obj,VALUE ary)674 check_array(VALUE obj, VALUE ary)
675 {
676 if (!RB_TYPE_P(ary, T_ARRAY)) {
677 rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
678 }
679 return ary;
680 }
681
682 static long
queue_length(VALUE self,struct rb_queue * q)683 queue_length(VALUE self, struct rb_queue *q)
684 {
685 return RARRAY_LEN(check_array(self, q->que));
686 }
687
688 static int
queue_closed_p(VALUE self)689 queue_closed_p(VALUE self)
690 {
691 return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
692 }
693
694 /*
695 * Document-class: ClosedQueueError
696 *
697 * The exception class which will be raised when pushing into a closed
698 * Queue. See Queue#close and SizedQueue#close.
699 */
700
701 NORETURN(static void raise_closed_queue_error(VALUE self));
702
703 static void
raise_closed_queue_error(VALUE self)704 raise_closed_queue_error(VALUE self)
705 {
706 rb_raise(rb_eClosedQueueError, "queue closed");
707 }
708
709 static VALUE
queue_closed_result(VALUE self,struct rb_queue * q)710 queue_closed_result(VALUE self, struct rb_queue *q)
711 {
712 assert(queue_length(self, q) == 0);
713 return Qnil;
714 }
715
716 /*
717 * Document-class: Queue
718 *
719 * The Queue class implements multi-producer, multi-consumer queues.
720 * It is especially useful in threaded programming when information
721 * must be exchanged safely between multiple threads. The Queue class
722 * implements all the required locking semantics.
723 *
724 * The class implements FIFO type of queue. In a FIFO queue, the first
725 * tasks added are the first retrieved.
726 *
727 * Example:
728 *
729 * queue = Queue.new
730 *
731 * producer = Thread.new do
732 * 5.times do |i|
733 * sleep rand(i) # simulate expense
734 * queue << i
735 * puts "#{i} produced"
736 * end
737 * end
738 *
739 * consumer = Thread.new do
740 * 5.times do |i|
741 * value = queue.pop
742 * sleep rand(i/2) # simulate expense
743 * puts "consumed #{value}"
744 * end
745 * end
746 *
747 * consumer.join
748 *
749 */
750
751 /*
752 * Document-method: Queue::new
753 *
754 * Creates a new queue instance.
755 */
756
757 static VALUE
rb_queue_initialize(VALUE self)758 rb_queue_initialize(VALUE self)
759 {
760 struct rb_queue *q = queue_ptr(self);
761 RB_OBJ_WRITE(self, &q->que, ary_buf_new());
762 list_head_init(queue_waitq(q));
763 return self;
764 }
765
766 static VALUE
queue_do_push(VALUE self,struct rb_queue * q,VALUE obj)767 queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
768 {
769 if (queue_closed_p(self)) {
770 raise_closed_queue_error(self);
771 }
772 rb_ary_push(check_array(self, q->que), obj);
773 wakeup_one(queue_waitq(q));
774 return self;
775 }
776
777 /*
778 * Document-method: Queue#close
779 * call-seq:
780 * close
781 *
782 * Closes the queue. A closed queue cannot be re-opened.
783 *
784 * After the call to close completes, the following are true:
785 *
786 * - +closed?+ will return true
787 *
788 * - +close+ will be ignored.
789 *
790 * - calling enq/push/<< will raise a +ClosedQueueError+.
791 *
792 * - when +empty?+ is false, calling deq/pop/shift will return an object
793 * from the queue as usual.
794 * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
795 * deq(true) will raise a +ThreadError+.
796 *
797 * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
798 *
799 * Example:
800 *
801 * q = Queue.new
802 * Thread.new{
803 * while e = q.deq # wait for nil to break loop
804 * # ...
805 * end
806 * }
807 * q.close
808 */
809
810 static VALUE
rb_queue_close(VALUE self)811 rb_queue_close(VALUE self)
812 {
813 struct rb_queue *q = queue_ptr(self);
814
815 if (!queue_closed_p(self)) {
816 FL_SET(self, QUEUE_CLOSED);
817
818 wakeup_all(queue_waitq(q));
819 }
820
821 return self;
822 }
823
824 /*
825 * Document-method: Queue#closed?
826 * call-seq: closed?
827 *
828 * Returns +true+ if the queue is closed.
829 */
830
831 static VALUE
rb_queue_closed_p(VALUE self)832 rb_queue_closed_p(VALUE self)
833 {
834 return queue_closed_p(self) ? Qtrue : Qfalse;
835 }
836
837 /*
838 * Document-method: Queue#push
839 * call-seq:
840 * push(object)
841 * enq(object)
842 * <<(object)
843 *
844 * Pushes the given +object+ to the queue.
845 */
846
847 static VALUE
rb_queue_push(VALUE self,VALUE obj)848 rb_queue_push(VALUE self, VALUE obj)
849 {
850 return queue_do_push(self, queue_ptr(self), obj);
851 }
852
853 static VALUE
queue_sleep(VALUE arg)854 queue_sleep(VALUE arg)
855 {
856 rb_thread_sleep_deadly_allow_spurious_wakeup();
857 return Qnil;
858 }
859
860 struct queue_waiter {
861 struct sync_waiter w;
862 union {
863 struct rb_queue *q;
864 struct rb_szqueue *sq;
865 } as;
866 };
867
868 static VALUE
queue_sleep_done(VALUE p)869 queue_sleep_done(VALUE p)
870 {
871 struct queue_waiter *qw = (struct queue_waiter *)p;
872
873 list_del(&qw->w.node);
874 qw->as.q->num_waiting--;
875
876 return Qfalse;
877 }
878
879 static VALUE
szqueue_sleep_done(VALUE p)880 szqueue_sleep_done(VALUE p)
881 {
882 struct queue_waiter *qw = (struct queue_waiter *)p;
883
884 list_del(&qw->w.node);
885 qw->as.sq->num_waiting_push--;
886
887 return Qfalse;
888 }
889
890 static VALUE
queue_do_pop(VALUE self,struct rb_queue * q,int should_block)891 queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
892 {
893 check_array(self, q->que);
894
895 while (RARRAY_LEN(q->que) == 0) {
896 if (!should_block) {
897 rb_raise(rb_eThreadError, "queue empty");
898 }
899 else if (queue_closed_p(self)) {
900 return queue_closed_result(self, q);
901 }
902 else {
903 struct queue_waiter qw;
904
905 assert(RARRAY_LEN(q->que) == 0);
906 assert(queue_closed_p(self) == 0);
907
908 qw.w.th = GET_THREAD();
909 qw.as.q = q;
910 list_add_tail(&qw.as.q->waitq, &qw.w.node);
911 qw.as.q->num_waiting++;
912
913 rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
914 }
915 }
916
917 return rb_ary_shift(q->que);
918 }
919
920 static int
queue_pop_should_block(int argc,const VALUE * argv)921 queue_pop_should_block(int argc, const VALUE *argv)
922 {
923 int should_block = 1;
924 rb_check_arity(argc, 0, 1);
925 if (argc > 0) {
926 should_block = !RTEST(argv[0]);
927 }
928 return should_block;
929 }
930
931 /*
932 * Document-method: Queue#pop
933 * call-seq:
934 * pop(non_block=false)
935 * deq(non_block=false)
936 * shift(non_block=false)
937 *
938 * Retrieves data from the queue.
939 *
940 * If the queue is empty, the calling thread is suspended until data is pushed
941 * onto the queue. If +non_block+ is true, the thread isn't suspended, and
942 * +ThreadError+ is raised.
943 */
944
945 static VALUE
rb_queue_pop(int argc,VALUE * argv,VALUE self)946 rb_queue_pop(int argc, VALUE *argv, VALUE self)
947 {
948 int should_block = queue_pop_should_block(argc, argv);
949 return queue_do_pop(self, queue_ptr(self), should_block);
950 }
951
952 /*
953 * Document-method: Queue#empty?
954 * call-seq: empty?
955 *
956 * Returns +true+ if the queue is empty.
957 */
958
959 static VALUE
rb_queue_empty_p(VALUE self)960 rb_queue_empty_p(VALUE self)
961 {
962 return queue_length(self, queue_ptr(self)) == 0 ? Qtrue : Qfalse;
963 }
964
965 /*
966 * Document-method: Queue#clear
967 *
968 * Removes all objects from the queue.
969 */
970
971 static VALUE
rb_queue_clear(VALUE self)972 rb_queue_clear(VALUE self)
973 {
974 struct rb_queue *q = queue_ptr(self);
975
976 rb_ary_clear(check_array(self, q->que));
977 return self;
978 }
979
980 /*
981 * Document-method: Queue#length
982 * call-seq:
983 * length
984 * size
985 *
986 * Returns the length of the queue.
987 */
988
989 static VALUE
rb_queue_length(VALUE self)990 rb_queue_length(VALUE self)
991 {
992 return LONG2NUM(queue_length(self, queue_ptr(self)));
993 }
994
995 /*
996 * Document-method: Queue#num_waiting
997 *
998 * Returns the number of threads waiting on the queue.
999 */
1000
1001 static VALUE
rb_queue_num_waiting(VALUE self)1002 rb_queue_num_waiting(VALUE self)
1003 {
1004 struct rb_queue *q = queue_ptr(self);
1005
1006 return INT2NUM(q->num_waiting);
1007 }
1008
1009 /*
1010 * Document-class: SizedQueue
1011 *
1012 * This class represents queues of specified size capacity. The push operation
1013 * may be blocked if the capacity is full.
1014 *
1015 * See Queue for an example of how a SizedQueue works.
1016 */
1017
1018 /*
1019 * Document-method: SizedQueue::new
1020 * call-seq: new(max)
1021 *
1022 * Creates a fixed-length queue with a maximum size of +max+.
1023 */
1024
1025 static VALUE
rb_szqueue_initialize(VALUE self,VALUE vmax)1026 rb_szqueue_initialize(VALUE self, VALUE vmax)
1027 {
1028 long max;
1029 struct rb_szqueue *sq = szqueue_ptr(self);
1030
1031 max = NUM2LONG(vmax);
1032 if (max <= 0) {
1033 rb_raise(rb_eArgError, "queue size must be positive");
1034 }
1035
1036 RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
1037 list_head_init(szqueue_waitq(sq));
1038 list_head_init(szqueue_pushq(sq));
1039 sq->max = max;
1040
1041 return self;
1042 }
1043
1044 /*
1045 * Document-method: SizedQueue#close
1046 * call-seq:
1047 * close
1048 *
1049 * Similar to Queue#close.
1050 *
1051 * The difference is behavior with waiting enqueuing threads.
1052 *
1053 * If there are waiting enqueuing threads, they are interrupted by
1054 * raising ClosedQueueError('queue closed').
1055 */
1056 static VALUE
rb_szqueue_close(VALUE self)1057 rb_szqueue_close(VALUE self)
1058 {
1059 if (!queue_closed_p(self)) {
1060 struct rb_szqueue *sq = szqueue_ptr(self);
1061
1062 FL_SET(self, QUEUE_CLOSED);
1063 wakeup_all(szqueue_waitq(sq));
1064 wakeup_all(szqueue_pushq(sq));
1065 }
1066 return self;
1067 }
1068
1069 /*
1070 * Document-method: SizedQueue#max
1071 *
1072 * Returns the maximum size of the queue.
1073 */
1074
1075 static VALUE
rb_szqueue_max_get(VALUE self)1076 rb_szqueue_max_get(VALUE self)
1077 {
1078 return LONG2NUM(szqueue_ptr(self)->max);
1079 }
1080
1081 /*
1082 * Document-method: SizedQueue#max=
1083 * call-seq: max=(number)
1084 *
1085 * Sets the maximum size of the queue to the given +number+.
1086 */
1087
1088 static VALUE
rb_szqueue_max_set(VALUE self,VALUE vmax)1089 rb_szqueue_max_set(VALUE self, VALUE vmax)
1090 {
1091 long max = NUM2LONG(vmax);
1092 long diff = 0;
1093 struct rb_szqueue *sq = szqueue_ptr(self);
1094
1095 if (max <= 0) {
1096 rb_raise(rb_eArgError, "queue size must be positive");
1097 }
1098 if (max > sq->max) {
1099 diff = max - sq->max;
1100 }
1101 sq->max = max;
1102 sync_wakeup(szqueue_pushq(sq), diff);
1103 return vmax;
1104 }
1105
1106 static int
szqueue_push_should_block(int argc,const VALUE * argv)1107 szqueue_push_should_block(int argc, const VALUE *argv)
1108 {
1109 int should_block = 1;
1110 rb_check_arity(argc, 1, 2);
1111 if (argc > 1) {
1112 should_block = !RTEST(argv[1]);
1113 }
1114 return should_block;
1115 }
1116
1117 /*
1118 * Document-method: SizedQueue#push
1119 * call-seq:
1120 * push(object, non_block=false)
1121 * enq(object, non_block=false)
1122 * <<(object)
1123 *
1124 * Pushes +object+ to the queue.
1125 *
1126 * If there is no space left in the queue, waits until space becomes
1127 * available, unless +non_block+ is true. If +non_block+ is true, the
1128 * thread isn't suspended, and +ThreadError+ is raised.
1129 */
1130
1131 static VALUE
rb_szqueue_push(int argc,VALUE * argv,VALUE self)1132 rb_szqueue_push(int argc, VALUE *argv, VALUE self)
1133 {
1134 struct rb_szqueue *sq = szqueue_ptr(self);
1135 int should_block = szqueue_push_should_block(argc, argv);
1136
1137 while (queue_length(self, &sq->q) >= sq->max) {
1138 if (!should_block) {
1139 rb_raise(rb_eThreadError, "queue full");
1140 }
1141 else if (queue_closed_p(self)) {
1142 goto closed;
1143 }
1144 else {
1145 struct queue_waiter qw;
1146 struct list_head *pushq = szqueue_pushq(sq);
1147
1148 qw.w.th = GET_THREAD();
1149 qw.as.sq = sq;
1150 list_add_tail(pushq, &qw.w.node);
1151 sq->num_waiting_push++;
1152
1153 rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
1154 }
1155 }
1156
1157 if (queue_closed_p(self)) {
1158 closed:
1159 raise_closed_queue_error(self);
1160 }
1161
1162 return queue_do_push(self, &sq->q, argv[0]);
1163 }
1164
1165 static VALUE
szqueue_do_pop(VALUE self,int should_block)1166 szqueue_do_pop(VALUE self, int should_block)
1167 {
1168 struct rb_szqueue *sq = szqueue_ptr(self);
1169 VALUE retval = queue_do_pop(self, &sq->q, should_block);
1170
1171 if (queue_length(self, &sq->q) < sq->max) {
1172 wakeup_one(szqueue_pushq(sq));
1173 }
1174
1175 return retval;
1176 }
1177
1178 /*
1179 * Document-method: SizedQueue#pop
1180 * call-seq:
1181 * pop(non_block=false)
1182 * deq(non_block=false)
1183 * shift(non_block=false)
1184 *
1185 * Retrieves data from the queue.
1186 *
1187 * If the queue is empty, the calling thread is suspended until data is pushed
1188 * onto the queue. If +non_block+ is true, the thread isn't suspended, and
1189 * +ThreadError+ is raised.
1190 */
1191
1192 static VALUE
rb_szqueue_pop(int argc,VALUE * argv,VALUE self)1193 rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
1194 {
1195 int should_block = queue_pop_should_block(argc, argv);
1196 return szqueue_do_pop(self, should_block);
1197 }
1198
1199 /*
1200 * Document-method: SizedQueue#clear
1201 *
1202 * Removes all objects from the queue.
1203 */
1204
1205 static VALUE
rb_szqueue_clear(VALUE self)1206 rb_szqueue_clear(VALUE self)
1207 {
1208 struct rb_szqueue *sq = szqueue_ptr(self);
1209
1210 rb_ary_clear(check_array(self, sq->q.que));
1211 wakeup_all(szqueue_pushq(sq));
1212 return self;
1213 }
1214
1215 /*
1216 * Document-method: SizedQueue#length
1217 * call-seq:
1218 * length
1219 * size
1220 *
1221 * Returns the length of the queue.
1222 */
1223
1224 static VALUE
rb_szqueue_length(VALUE self)1225 rb_szqueue_length(VALUE self)
1226 {
1227 struct rb_szqueue *sq = szqueue_ptr(self);
1228
1229 return LONG2NUM(queue_length(self, &sq->q));
1230 }
1231
1232 /*
1233 * Document-method: SizedQueue#num_waiting
1234 *
1235 * Returns the number of threads waiting on the queue.
1236 */
1237
1238 static VALUE
rb_szqueue_num_waiting(VALUE self)1239 rb_szqueue_num_waiting(VALUE self)
1240 {
1241 struct rb_szqueue *sq = szqueue_ptr(self);
1242
1243 return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1244 }
1245
1246 /*
1247 * Document-method: SizedQueue#empty?
1248 * call-seq: empty?
1249 *
1250 * Returns +true+ if the queue is empty.
1251 */
1252
1253 static VALUE
rb_szqueue_empty_p(VALUE self)1254 rb_szqueue_empty_p(VALUE self)
1255 {
1256 struct rb_szqueue *sq = szqueue_ptr(self);
1257
1258 return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse;
1259 }
1260
1261
1262 /* ConditionalVariable */
1263 struct rb_condvar {
1264 struct list_head waitq;
1265 rb_serial_t fork_gen;
1266 };
1267
1268 /*
1269 * Document-class: ConditionVariable
1270 *
1271 * ConditionVariable objects augment class Mutex. Using condition variables,
1272 * it is possible to suspend while in the middle of a critical section until a
1273 * resource becomes available.
1274 *
1275 * Example:
1276 *
1277 * mutex = Mutex.new
1278 * resource = ConditionVariable.new
1279 *
1280 * a = Thread.new {
1281 * mutex.synchronize {
1282 * # Thread 'a' now needs the resource
1283 * resource.wait(mutex)
1284 * # 'a' can now have the resource
1285 * }
1286 * }
1287 *
1288 * b = Thread.new {
1289 * mutex.synchronize {
1290 * # Thread 'b' has finished using the resource
1291 * resource.signal
1292 * }
1293 * }
1294 */
1295
1296 static size_t
condvar_memsize(const void * ptr)1297 condvar_memsize(const void *ptr)
1298 {
1299 return sizeof(struct rb_condvar);
1300 }
1301
1302 static const rb_data_type_t cv_data_type = {
1303 "condvar",
1304 {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
1305 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
1306 };
1307
1308 static struct rb_condvar *
condvar_ptr(VALUE self)1309 condvar_ptr(VALUE self)
1310 {
1311 struct rb_condvar *cv;
1312 rb_serial_t fork_gen = GET_VM()->fork_gen;
1313
1314 TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
1315
1316 /* forked children can't reach into parent thread stacks */
1317 if (cv->fork_gen != fork_gen) {
1318 cv->fork_gen = fork_gen;
1319 list_head_init(&cv->waitq);
1320 }
1321
1322 return cv;
1323 }
1324
1325 static VALUE
condvar_alloc(VALUE klass)1326 condvar_alloc(VALUE klass)
1327 {
1328 struct rb_condvar *cv;
1329 VALUE obj;
1330
1331 obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
1332 list_head_init(&cv->waitq);
1333
1334 return obj;
1335 }
1336
1337 /*
1338 * Document-method: ConditionVariable::new
1339 *
1340 * Creates a new condition variable instance.
1341 */
1342
1343 static VALUE
rb_condvar_initialize(VALUE self)1344 rb_condvar_initialize(VALUE self)
1345 {
1346 struct rb_condvar *cv = condvar_ptr(self);
1347 list_head_init(&cv->waitq);
1348 return self;
1349 }
1350
1351 struct sleep_call {
1352 VALUE mutex;
1353 VALUE timeout;
1354 };
1355
1356 static ID id_sleep;
1357
1358 static VALUE
do_sleep(VALUE args)1359 do_sleep(VALUE args)
1360 {
1361 struct sleep_call *p = (struct sleep_call *)args;
1362 return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1363 }
1364
1365 static VALUE
delete_from_waitq(struct sync_waiter * w)1366 delete_from_waitq(struct sync_waiter *w)
1367 {
1368 list_del(&w->node);
1369
1370 return Qnil;
1371 }
1372
1373 /*
1374 * Document-method: ConditionVariable#wait
1375 * call-seq: wait(mutex, timeout=nil)
1376 *
1377 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
1378 *
1379 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
1380 * even if no other thread doesn't signal.
1381 */
1382
1383 static VALUE
rb_condvar_wait(int argc,VALUE * argv,VALUE self)1384 rb_condvar_wait(int argc, VALUE *argv, VALUE self)
1385 {
1386 struct rb_condvar *cv = condvar_ptr(self);
1387 struct sleep_call args;
1388 struct sync_waiter w;
1389
1390 rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
1391
1392 w.th = GET_THREAD();
1393 list_add_tail(&cv->waitq, &w.node);
1394 rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);
1395
1396 return self;
1397 }
1398
1399 /*
1400 * Document-method: ConditionVariable#signal
1401 *
1402 * Wakes up the first thread in line waiting for this lock.
1403 */
1404
1405 static VALUE
rb_condvar_signal(VALUE self)1406 rb_condvar_signal(VALUE self)
1407 {
1408 struct rb_condvar *cv = condvar_ptr(self);
1409 wakeup_one(&cv->waitq);
1410 return self;
1411 }
1412
1413 /*
1414 * Document-method: ConditionVariable#broadcast
1415 *
1416 * Wakes up all threads waiting for this lock.
1417 */
1418
1419 static VALUE
rb_condvar_broadcast(VALUE self)1420 rb_condvar_broadcast(VALUE self)
1421 {
1422 struct rb_condvar *cv = condvar_ptr(self);
1423 wakeup_all(&cv->waitq);
1424 return self;
1425 }
1426
1427 /* :nodoc: */
1428 static VALUE
undumpable(VALUE obj)1429 undumpable(VALUE obj)
1430 {
1431 rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
1432 UNREACHABLE_RETURN(Qnil);
1433 }
1434
1435 static VALUE
define_thread_class(VALUE outer,const char * name,VALUE super)1436 define_thread_class(VALUE outer, const char *name, VALUE super)
1437 {
1438 VALUE klass = rb_define_class_under(outer, name, super);
1439 rb_define_const(rb_cObject, name, klass);
1440 return klass;
1441 }
1442
1443 static void
Init_thread_sync(void)1444 Init_thread_sync(void)
1445 {
1446 #undef rb_intern
1447 #if 0
1448 rb_cMutex = rb_define_class("Mutex", rb_cObject); /* teach rdoc Mutex */
1449 rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
1450 rb_cQueue = rb_define_class("Queue", rb_cObject); /* teach rdoc Queue */
1451 rb_cSizedQueue = rb_define_class("SizedQueue", rb_cObject); /* teach rdoc SizedQueue */
1452 #endif
1453
1454 #define DEFINE_CLASS(name, super) \
1455 rb_c##name = define_thread_class(rb_cThread, #name, rb_c##super)
1456
1457 /* Mutex */
1458 DEFINE_CLASS(Mutex, Object);
1459 rb_define_alloc_func(rb_cMutex, mutex_alloc);
1460 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
1461 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
1462 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
1463 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
1464 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
1465 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
1466 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
1467 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
1468
1469 /* Queue */
1470 DEFINE_CLASS(Queue, Object);
1471 rb_define_alloc_func(rb_cQueue, queue_alloc);
1472
1473 rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
1474
1475 rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
1476 rb_undef_method(rb_cQueue, "initialize_copy");
1477 rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
1478 rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
1479 rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
1480 rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
1481 rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
1482 rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
1483 rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
1484 rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
1485 rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
1486
1487 rb_define_alias(rb_cQueue, "enq", "push");
1488 rb_define_alias(rb_cQueue, "<<", "push");
1489 rb_define_alias(rb_cQueue, "deq", "pop");
1490 rb_define_alias(rb_cQueue, "shift", "pop");
1491 rb_define_alias(rb_cQueue, "size", "length");
1492
1493 DEFINE_CLASS(SizedQueue, Queue);
1494 rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
1495
1496 rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
1497 rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
1498 rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
1499 rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
1500 rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
1501 rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
1502 rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
1503 rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
1504 rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
1505 rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
1506
1507 rb_define_alias(rb_cSizedQueue, "enq", "push");
1508 rb_define_alias(rb_cSizedQueue, "<<", "push");
1509 rb_define_alias(rb_cSizedQueue, "deq", "pop");
1510 rb_define_alias(rb_cSizedQueue, "shift", "pop");
1511 rb_define_alias(rb_cSizedQueue, "size", "length");
1512
1513 /* CVar */
1514 DEFINE_CLASS(ConditionVariable, Object);
1515 rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
1516
1517 id_sleep = rb_intern("sleep");
1518
1519 rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
1520 rb_undef_method(rb_cConditionVariable, "initialize_copy");
1521 rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
1522 rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
1523 rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
1524 rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
1525
1526 rb_provide("thread.rb");
1527 }
1528