1use strict;
2use warnings;
3
4use Config;
5
6BEGIN {
7    if (! $Config{'useithreads'}) {
8        print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
9        exit(0);
10    }
11    if (! $Config{'d_select'}) {
12        print("1..0 # SKIP 'select()' not available for testing\n");
13        exit(0);
14    }
15}
16
17use threads;
18use Thread::Queue;
19
20use Test::More;
21
22plan tests => 13;
23
24my $q = Thread::Queue->new();
25my $rpt = Thread::Queue->new();
26
27my $th = threads->create( sub {
28    # (1) Set queue limit, and report it
29    $q->limit = 3;
30    $rpt->enqueue($q->limit);
31
32    # (3) Fetch an item from queue
33    my $item = $q->dequeue();
34    is($item, 1, 'Dequeued item 1');
35    # Report queue count
36    $rpt->enqueue($q->pending());
37
38    # q = (2, 3, 4, 5); r = (4)
39
40    # (4) Enqueue more items - will block
41    $q->enqueue(6, 7);
42    # q = (5, 'foo', 6, 7); r = (4, 3, 4, 3)
43
44    # (6) Get reports from main
45    my @items = $rpt->dequeue(5);
46    is_deeply(\@items, [4, 3, 4, 3, 'go'], 'Queue reports');
47});
48
49# (2) Read queue limit from thread
50my $item = $rpt->dequeue();
51is($item, $q->limit, 'Queue limit set');
52# Send items
53$q->enqueue(1, 2, 3, 4, 5);
54
55# (5) Read queue count
56$item = $rpt->dequeue;
57# q = (2, 3, 4, 5); r = ()
58is($item, $q->pending(), 'Queue count');
59# Report back the queue count
60$rpt->enqueue($q->pending);
61# q = (2, 3, 4, 5); r = (4)
62
63# Read an item from queue
64$item = $q->dequeue();
65is($item, 2, 'Dequeued item 2');
66# q = (3, 4, 5); r = (4)
67# Report back the queue count
68$rpt->enqueue($q->pending);
69# q = (3, 4, 5); r = (4, 3)
70
71# 'insert' doesn't care about queue limit
72$q->insert(3, 'foo');
73$rpt->enqueue($q->pending);
74# q = (3, 4, 5, 'foo'); r = (4, 3, 4)
75
76# Read an item from queue
77$item = $q->dequeue();
78is($item, 3, 'Dequeued item 3');
79# q = (4, 5, 'foo'); r = (4, 3, 4)
80# Report back the queue count
81$rpt->enqueue($q->pending);
82# q = (4, 5, 'foo'); r = (4, 3, 4, 3)
83
84# Read all items from queue
85my @items = $q->dequeue(3);
86is_deeply(\@items, [4, 5, 'foo'], 'Dequeued 3 items');
87# Thread is now unblocked
88
89@items = $q->dequeue(2);
90is_deeply(\@items, [6, 7], 'Dequeued 2 items');
91
92# Thread is now unblocked
93# Handshake with thread
94$rpt->enqueue('go');
95
96# (7) - Done
97$th->join;
98
99# It's an error to call dequeue methods with COUNT > LIMIT
100eval { $q->dequeue(5); };
101like($@, qr/exceeds queue size limit/, $@);
102
103# Bug #120157
104#  Fix deadlock from combination of dequeue_nb, enqueue and queue size limit
105
106# (1) Fill queue
107$q->enqueue(1..3);
108is($q->pending, 3, 'Queue loaded');
109
110# (2) Thread will block trying to add to full queue
111$th = threads->create( sub {
112    $q->enqueue(99);
113    return('OK');
114});
115threads->yield();
116
117# (3) Dequeue an item so that thread can unblock
118is($q->dequeue_nb(), 1, 'Dequeued item');
119
120# (4) Thread unblocks
121is($th->join(), 'OK', 'Thread exited');
122
123# (5) Fetch queue to show thread's item was enqueued
124@items = ();
125while (my $item = $q->dequeue_nb()) {
126    push(@items, $item);
127}
128is_deeply(\@items, [2,3,99], 'Dequeued remaining');
129
130exit(0);
131
132# EOF
133