1use strict; 2use warnings; 3 4use Config; 5 6BEGIN { 7 if (! $Config{'useithreads'}) { 8 print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); 9 exit(0); 10 } 11 if (! $Config{'d_select'}) { 12 print("1..0 # SKIP 'select()' not available for testing\n"); 13 exit(0); 14 } 15} 16 17use threads; 18use Thread::Queue; 19 20use Test::More; 21 22plan tests => 13; 23 24my $q = Thread::Queue->new(); 25my $rpt = Thread::Queue->new(); 26 27my $th = threads->create( sub { 28 # (1) Set queue limit, and report it 29 $q->limit = 3; 30 $rpt->enqueue($q->limit); 31 32 # (3) Fetch an item from queue 33 my $item = $q->dequeue(); 34 is($item, 1, 'Dequeued item 1'); 35 # Report queue count 36 $rpt->enqueue($q->pending()); 37 38 # q = (2, 3, 4, 5); r = (4) 39 40 # (4) Enqueue more items - will block 41 $q->enqueue(6, 7); 42 # q = (5, 'foo', 6, 7); r = (4, 3, 4, 3) 43 44 # (6) Get reports from main 45 my @items = $rpt->dequeue(5); 46 is_deeply(\@items, [4, 3, 4, 3, 'go'], 'Queue reports'); 47}); 48 49# (2) Read queue limit from thread 50my $item = $rpt->dequeue(); 51is($item, $q->limit, 'Queue limit set'); 52# Send items 53$q->enqueue(1, 2, 3, 4, 5); 54 55# (5) Read queue count 56$item = $rpt->dequeue; 57# q = (2, 3, 4, 5); r = () 58is($item, $q->pending(), 'Queue count'); 59# Report back the queue count 60$rpt->enqueue($q->pending); 61# q = (2, 3, 4, 5); r = (4) 62 63# Read an item from queue 64$item = $q->dequeue(); 65is($item, 2, 'Dequeued item 2'); 66# q = (3, 4, 5); r = (4) 67# Report back the queue count 68$rpt->enqueue($q->pending); 69# q = (3, 4, 5); r = (4, 3) 70 71# 'insert' doesn't care about queue limit 72$q->insert(3, 'foo'); 73$rpt->enqueue($q->pending); 74# q = (3, 4, 5, 'foo'); r = (4, 3, 4) 75 76# Read an item from queue 77$item = $q->dequeue(); 78is($item, 3, 'Dequeued item 3'); 79# q = (4, 5, 'foo'); r = (4, 3, 4) 80# Report back the queue count 81$rpt->enqueue($q->pending); 82# q = (4, 5, 'foo'); r = (4, 3, 4, 3) 83 84# Read all items from queue 85my @items = $q->dequeue(3); 86is_deeply(\@items, [4, 5, 'foo'], 'Dequeued 3 items'); 87# Thread is now unblocked 88 89@items = $q->dequeue(2); 90is_deeply(\@items, [6, 7], 'Dequeued 2 items'); 91 92# Thread is now unblocked 93# Handshake with thread 94$rpt->enqueue('go'); 95 96# (7) - Done 97$th->join; 98 99# It's an error to call dequeue methods with COUNT > LIMIT 100eval { $q->dequeue(5); }; 101like($@, qr/exceeds queue size limit/, $@); 102 103# Bug #120157 104# Fix deadlock from combination of dequeue_nb, enqueue and queue size limit 105 106# (1) Fill queue 107$q->enqueue(1..3); 108is($q->pending, 3, 'Queue loaded'); 109 110# (2) Thread will block trying to add to full queue 111$th = threads->create( sub { 112 $q->enqueue(99); 113 return('OK'); 114}); 115threads->yield(); 116 117# (3) Dequeue an item so that thread can unblock 118is($q->dequeue_nb(), 1, 'Dequeued item'); 119 120# (4) Thread unblocks 121is($th->join(), 'OK', 'Thread exited'); 122 123# (5) Fetch queue to show thread's item was enqueued 124@items = (); 125while (my $item = $q->dequeue_nb()) { 126 push(@items, $item); 127} 128is_deeply(\@items, [2,3,99], 'Dequeued remaining'); 129 130exit(0); 131 132# EOF 133