1use strict;
2use warnings;
3
4use Config;
5
6BEGIN {
7    if (! $Config{'useithreads'}) {
8        print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
9        exit(0);
10    }
11    if (! $Config{'d_select'}) {
12        print("1..0 # SKIP 'select()' not available for testing\n");
13        exit(0);
14    }
15}
16
17use threads;
18use Thread::Queue;
19
20use Test::More;
21
22my $num_threads = 3;
23my $cycles = 2;
24my $count = 2;
25plan tests => 3*$num_threads*$cycles*$count + 6*$num_threads + 6;
26
27# Test for end() while threads are blocked and no more items in queue
28{
29    my @items = 1..($num_threads*$cycles*$count);
30    my $q = Thread::Queue->new(@items);
31    my $r = Thread::Queue->new();
32
33    my @threads;
34    for my $ii (1..$num_threads) {
35        push @threads, threads->create( sub {
36            # Thread will loop until no more work is coming
37            LOOP:
38            while (my @set = $q->dequeue($count)) {
39                foreach my $item (@set) {
40                    last LOOP if (! defined($item));
41                    pass("'$item' read from queue in thread $ii");
42                }
43                select(undef, undef, undef, rand(1));
44                $r->enqueue($ii);
45            }
46            pass("Thread $ii exiting");
47        });
48    }
49
50    # Make sure there's nothing in the queue and threads are blocking
51    for my $ii (1..($num_threads*$cycles)) {
52        $r->dequeue();
53    }
54    sleep(1);
55    threads->yield();
56
57    is($q->pending(), 0, 'Queue is empty');
58
59    # Signal no more work is coming
60    $q->end();
61
62    is($q->pending(), undef, 'Queue is ended');
63
64    for my $thread (@threads) {
65        $thread->join;
66        pass($thread->tid." joined");
67    }
68}
69
70# Test for end() while threads are blocked and items still remain in queue
71{
72    my @items = 1..($num_threads*$cycles*$count + 1);
73    my $q = Thread::Queue->new(@items);
74    my $r = Thread::Queue->new();
75
76    my @threads;
77    for my $ii (1..$num_threads) {
78        push @threads, threads->create( sub {
79            # Thread will loop until no more work is coming
80            LOOP:
81            while (my @set = $q->dequeue($count)) {
82                foreach my $item (@set) {
83                    last LOOP if (! defined($item));
84                    pass("'$item' read from queue in thread $ii");
85                }
86                select(undef, undef, undef, rand(1));
87                $r->enqueue($ii);
88            }
89            pass("Thread $ii exiting");
90        });
91    }
92
93    # Make sure there's nothing in the queue and threads are blocking
94    for my $ii (1..($num_threads*$cycles)) {
95        $r->dequeue();
96    }
97    sleep(1);
98    threads->yield();
99
100    is($q->pending(), 1, 'Queue has one left');
101
102    # Signal no more work is coming
103    $q->end();
104
105    for my $thread (@threads) {
106        $thread->join;
107        pass($thread->tid." joined");
108    }
109
110    is($q->pending(), undef, 'Queue is ended');
111}
112
113# Test of end() send while items in queue
114{
115    my @items = 1..($num_threads*$cycles*$count + 1);
116    my $q = Thread::Queue->new(@items);
117
118    my @threads;
119    for my $ii (1..$num_threads) {
120        push @threads, threads->create( sub {
121            # Thread will loop until no more work is coming
122            LOOP:
123            while (my @set = $q->dequeue($count)) {
124                foreach my $item (@set) {
125                    last LOOP if (! defined($item));
126                    pass("'$item' read from queue in thread $ii");
127                }
128                select(undef, undef, undef, rand(1));
129            }
130            pass("Thread $ii exiting");
131        });
132    }
133
134    # Signal no more work is coming to the blocked threads, they
135    # should unblock.
136    $q->end();
137
138    for my $thread (@threads) {
139        $thread->join;
140        pass($thread->tid." joined");
141    }
142}
143
144exit(0);
145
146# EOF
147