1use strict; 2use warnings; 3 4use Config; 5 6BEGIN { 7 if (! $Config{'useithreads'}) { 8 print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); 9 exit(0); 10 } 11 if (! $Config{'d_select'}) { 12 print("1..0 # SKIP 'select()' not available for testing\n"); 13 exit(0); 14 } 15} 16 17use threads; 18use Thread::Queue; 19 20use Test::More; 21 22my $num_threads = 3; 23my $cycles = 2; 24my $count = 2; 25plan tests => 3*$num_threads*$cycles*$count + 6*$num_threads + 6; 26 27# Test for end() while threads are blocked and no more items in queue 28{ 29 my @items = 1..($num_threads*$cycles*$count); 30 my $q = Thread::Queue->new(@items); 31 my $r = Thread::Queue->new(); 32 33 my @threads; 34 for my $ii (1..$num_threads) { 35 push @threads, threads->create( sub { 36 # Thread will loop until no more work is coming 37 LOOP: 38 while (my @set = $q->dequeue($count)) { 39 foreach my $item (@set) { 40 last LOOP if (! defined($item)); 41 pass("'$item' read from queue in thread $ii"); 42 } 43 select(undef, undef, undef, rand(1)); 44 $r->enqueue($ii); 45 } 46 pass("Thread $ii exiting"); 47 }); 48 } 49 50 # Make sure there's nothing in the queue and threads are blocking 51 for my $ii (1..($num_threads*$cycles)) { 52 $r->dequeue(); 53 } 54 sleep(1); 55 threads->yield(); 56 57 is($q->pending(), 0, 'Queue is empty'); 58 59 # Signal no more work is coming 60 $q->end(); 61 62 is($q->pending(), undef, 'Queue is ended'); 63 64 for my $thread (@threads) { 65 $thread->join; 66 pass($thread->tid." joined"); 67 } 68} 69 70# Test for end() while threads are blocked and items still remain in queue 71{ 72 my @items = 1..($num_threads*$cycles*$count + 1); 73 my $q = Thread::Queue->new(@items); 74 my $r = Thread::Queue->new(); 75 76 my @threads; 77 for my $ii (1..$num_threads) { 78 push @threads, threads->create( sub { 79 # Thread will loop until no more work is coming 80 LOOP: 81 while (my @set = $q->dequeue($count)) { 82 foreach my $item (@set) { 83 last LOOP if (! defined($item)); 84 pass("'$item' read from queue in thread $ii"); 85 } 86 select(undef, undef, undef, rand(1)); 87 $r->enqueue($ii); 88 } 89 pass("Thread $ii exiting"); 90 }); 91 } 92 93 # Make sure there's nothing in the queue and threads are blocking 94 for my $ii (1..($num_threads*$cycles)) { 95 $r->dequeue(); 96 } 97 sleep(1); 98 threads->yield(); 99 100 is($q->pending(), 1, 'Queue has one left'); 101 102 # Signal no more work is coming 103 $q->end(); 104 105 for my $thread (@threads) { 106 $thread->join; 107 pass($thread->tid." joined"); 108 } 109 110 is($q->pending(), undef, 'Queue is ended'); 111} 112 113# Test of end() send while items in queue 114{ 115 my @items = 1..($num_threads*$cycles*$count + 1); 116 my $q = Thread::Queue->new(@items); 117 118 my @threads; 119 for my $ii (1..$num_threads) { 120 push @threads, threads->create( sub { 121 # Thread will loop until no more work is coming 122 LOOP: 123 while (my @set = $q->dequeue($count)) { 124 foreach my $item (@set) { 125 last LOOP if (! defined($item)); 126 pass("'$item' read from queue in thread $ii"); 127 } 128 select(undef, undef, undef, rand(1)); 129 } 130 pass("Thread $ii exiting"); 131 }); 132 } 133 134 # Signal no more work is coming to the blocked threads, they 135 # should unblock. 136 $q->end(); 137 138 for my $thread (@threads) { 139 $thread->join; 140 pass($thread->tid." joined"); 141 } 142} 143 144exit(0); 145 146# EOF 147