1package Thread::Queue;
2
3use strict;
4use warnings;
5
6our $VERSION = '3.14';          # remember to update version in POD!
7$VERSION = eval $VERSION;
8
9use threads::shared 1.21;
10use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
11
12# Carp errors from threads::shared calls should complain about caller
13our @CARP_NOT = ("threads::shared");
14
15# Create a new queue possibly pre-populated with items
16sub new
17{
18    my $class = shift;
19    my @queue :shared = map { shared_clone($_) } @_;
20    my %self :shared = ( 'queue' => \@queue );
21    return bless(\%self, $class);
22}
23
24# Add items to the tail of a queue
25sub enqueue
26{
27    my $self = shift;
28    lock(%$self);
29
30    if ($$self{'ENDED'}) {
31        require Carp;
32        Carp::croak("'enqueue' method called on queue that has been 'end'ed");
33    }
34
35    # Block if queue size exceeds any specified limit
36    my $queue = $$self{'queue'};
37    cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));
38
39    # Add items to queue, and then signal other threads
40    push(@$queue, map { shared_clone($_) } @_)
41        and cond_signal(%$self);
42}
43
44# Set or return the max. size for a queue
45sub limit : lvalue
46{
47    my $self = shift;
48    lock(%$self);
49    $$self{'LIMIT'};
50}
51
52# Return a count of the number of items on a queue
53sub pending
54{
55    my $self = shift;
56    lock(%$self);
57    return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
58    return scalar(@{$$self{'queue'}});
59}
60
61# Indicate that no more data will enter the queue
62sub end
63{
64    my $self = shift;
65    lock(%$self);
66    # No more data is coming
67    $$self{'ENDED'} = 1;
68
69    cond_signal(%$self);  # Unblock possibly waiting threads
70}
71
72# Return 1 or more items from the head of a queue, blocking if needed
73sub dequeue
74{
75    my $self = shift;
76    lock(%$self);
77    my $queue = $$self{'queue'};
78
79    my $count = @_ ? $self->_validate_count(shift) : 1;
80
81    # Wait for requisite number of items
82    cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
83
84    # If no longer blocking, try getting whatever is left on the queue
85    return $self->dequeue_nb($count) if ($$self{'ENDED'});
86
87    # Return single item
88    if ($count == 1) {
89        my $item = shift(@$queue);
90        cond_signal(%$self);  # Unblock possibly waiting threads
91        return $item;
92    }
93
94    # Return multiple items
95    my @items;
96    push(@items, shift(@$queue)) for (1..$count);
97    cond_signal(%$self);  # Unblock possibly waiting threads
98    return @items;
99}
100
101# Return items from the head of a queue with no blocking
102sub dequeue_nb
103{
104    my $self = shift;
105    lock(%$self);
106    my $queue = $$self{'queue'};
107
108    my $count = @_ ? $self->_validate_count(shift) : 1;
109
110    # Return single item
111    if ($count == 1) {
112        my $item = shift(@$queue);
113        cond_signal(%$self);  # Unblock possibly waiting threads
114        return $item;
115    }
116
117    # Return multiple items
118    my @items;
119    for (1..$count) {
120        last if (! @$queue);
121        push(@items, shift(@$queue));
122    }
123    cond_signal(%$self);  # Unblock possibly waiting threads
124    return @items;
125}
126
127# Return items from the head of a queue, blocking if needed up to a timeout
128sub dequeue_timed
129{
130    my $self = shift;
131    lock(%$self);
132    my $queue = $$self{'queue'};
133
134    # Timeout may be relative or absolute
135    my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
136    # Convert to an absolute time for use with cond_timedwait()
137    if ($timeout < 32000000) {   # More than one year
138        $timeout += time();
139    }
140
141    my $count = @_ ? $self->_validate_count(shift) : 1;
142
143    # Wait for requisite number of items, or until timeout
144    while ((@$queue < $count) && ! $$self{'ENDED'}) {
145        last if (! cond_timedwait(%$self, $timeout));
146    }
147
148    # Get whatever we need off the queue if available
149    return $self->dequeue_nb($count);
150}
151
152# Return an item without removing it from a queue
153sub peek
154{
155    my $self = shift;
156    lock(%$self);
157    my $index = @_ ? $self->_validate_index(shift) : 0;
158    return $$self{'queue'}[$index];
159}
160
161# Insert items anywhere into a queue
162sub insert
163{
164    my $self = shift;
165    lock(%$self);
166
167    if ($$self{'ENDED'}) {
168        require Carp;
169        Carp::croak("'insert' method called on queue that has been 'end'ed");
170    }
171
172    my $queue = $$self{'queue'};
173
174    my $index = $self->_validate_index(shift);
175
176    return if (! @_);   # Nothing to insert
177
178    # Support negative indices
179    if ($index < 0) {
180        $index += @$queue;
181        if ($index < 0) {
182            $index = 0;
183        }
184    }
185
186    # Dequeue items from $index onward
187    my @tmp;
188    while (@$queue > $index) {
189        unshift(@tmp, pop(@$queue))
190    }
191
192    # Add new items to the queue
193    push(@$queue, map { shared_clone($_) } @_);
194
195    # Add previous items back onto the queue
196    push(@$queue, @tmp);
197
198    cond_signal(%$self);  # Unblock possibly waiting threads
199}
200
201# Remove items from anywhere in a queue
202sub extract
203{
204    my $self = shift;
205    lock(%$self);
206    my $queue = $$self{'queue'};
207
208    my $index = @_ ? $self->_validate_index(shift) : 0;
209    my $count = @_ ? $self->_validate_count(shift) : 1;
210
211    # Support negative indices
212    if ($index < 0) {
213        $index += @$queue;
214        if ($index < 0) {
215            $count += $index;
216            return if ($count <= 0);           # Beyond the head of the queue
217            return $self->dequeue_nb($count);  # Extract from the head
218        }
219    }
220
221    # Dequeue items from $index+$count onward
222    my @tmp;
223    while (@$queue > ($index+$count)) {
224        unshift(@tmp, pop(@$queue))
225    }
226
227    # Extract desired items
228    my @items;
229    unshift(@items, pop(@$queue)) while (@$queue > $index);
230
231    # Add back any removed items
232    push(@$queue, @tmp);
233
234    cond_signal(%$self);  # Unblock possibly waiting threads
235
236    # Return single item
237    return $items[0] if ($count == 1);
238
239    # Return multiple items
240    return @items;
241}
242
243### Internal Methods ###
244
245# Check value of the requested index
246sub _validate_index
247{
248    my $self = shift;
249    my $index = shift;
250
251    if (! defined($index) ||
252        ! looks_like_number($index) ||
253        (int($index) != $index))
254    {
255        require Carp;
256        my ($method) = (caller(1))[3];
257        my $class_name = ref($self);
258        $method =~ s/$class_name\:://;
259        $index = 'undef' if (! defined($index));
260        Carp::croak("Invalid 'index' argument ($index) to '$method' method");
261    }
262
263    return $index;
264};
265
266# Check value of the requested count
267sub _validate_count
268{
269    my $self = shift;
270    my $count = shift;
271
272    if (! defined($count) ||
273        ! looks_like_number($count) ||
274        (int($count) != $count) ||
275        ($count < 1) ||
276        ($$self{'LIMIT'} && $count > $$self{'LIMIT'}))
277    {
278        require Carp;
279        my ($method) = (caller(1))[3];
280        my $class_name = ref($self);
281        $method =~ s/$class_name\:://;
282        $count = 'undef' if (! defined($count));
283        if ($$self{'LIMIT'} && $count > $$self{'LIMIT'}) {
284            Carp::croak("'count' argument ($count) to '$method' method exceeds queue size limit ($$self{'LIMIT'})");
285        } else {
286            Carp::croak("Invalid 'count' argument ($count) to '$method' method");
287        }
288    }
289
290    return $count;
291};
292
293# Check value of the requested timeout
294sub _validate_timeout
295{
296    my $self = shift;
297    my $timeout = shift;
298
299    if (! defined($timeout) ||
300        ! looks_like_number($timeout))
301    {
302        require Carp;
303        my ($method) = (caller(1))[3];
304        my $class_name = ref($self);
305        $method =~ s/$class_name\:://;
306        $timeout = 'undef' if (! defined($timeout));
307        Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
308    }
309
310    return $timeout;
311};
312
3131;
314
315=head1 NAME
316
317Thread::Queue - Thread-safe queues
318
319=head1 VERSION
320
321This document describes Thread::Queue version 3.14
322
323=head1 SYNOPSIS
324
325    use strict;
326    use warnings;
327
328    use threads;
329    use Thread::Queue;
330
331    my $q = Thread::Queue->new();    # A new empty queue
332
333    # Worker thread
334    my $thr = threads->create(
335        sub {
336            # Thread will loop until no more work
337            while (defined(my $item = $q->dequeue())) {
338                # Do work on $item
339                ...
340            }
341        }
342    );
343
344    # Send work to the thread
345    $q->enqueue($item1, ...);
346    # Signal that there is no more work to be sent
347    $q->end();
348    # Join up with the thread when it finishes
349    $thr->join();
350
351    ...
352
353    # Count of items in the queue
354    my $left = $q->pending();
355
356    # Non-blocking dequeue
357    if (defined(my $item = $q->dequeue_nb())) {
358        # Work on $item
359    }
360
361    # Blocking dequeue with 5-second timeout
362    if (defined(my $item = $q->dequeue_timed(5))) {
363        # Work on $item
364    }
365
366    # Set a size for a queue
367    $q->limit = 5;
368
369    # Get the second item in the queue without dequeuing anything
370    my $item = $q->peek(1);
371
372    # Insert two items into the queue just behind the head
373    $q->insert(1, $item1, $item2);
374
375    # Extract the last two items on the queue
376    my ($item1, $item2) = $q->extract(-2, 2);
377
378=head1 DESCRIPTION
379
380This module provides thread-safe FIFO queues that can be accessed safely by
381any number of threads.
382
383Any data types supported by L<threads::shared> can be passed via queues:
384
385=over
386
387=item Ordinary scalars
388
389=item Array refs
390
391=item Hash refs
392
393=item Scalar refs
394
395=item Objects based on the above
396
397=back
398
399Ordinary scalars are added to queues as they are.
400
401If not already thread-shared, the other complex data types will be cloned
402(recursively, if needed, and including any C<bless>ings and read-only
403settings) into thread-shared structures before being placed onto a queue.
404
405For example, the following would cause L<Thread::Queue> to create a empty,
406shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
407and 'baz' from C<@ary> into it, and then place that shared reference onto
408the queue:
409
410 my @ary = qw/foo bar baz/;
411 $q->enqueue(\@ary);
412
413However, for the following, the items are already shared, so their references
414are added directly to the queue, and no cloning takes place:
415
416 my @ary :shared = qw/foo bar baz/;
417 $q->enqueue(\@ary);
418
419 my $obj = &shared({});
420 $$obj{'foo'} = 'bar';
421 $$obj{'qux'} = 99;
422 bless($obj, 'My::Class');
423 $q->enqueue($obj);
424
425See L</"LIMITATIONS"> for caveats related to passing objects via queues.
426
427=head1 QUEUE CREATION
428
429=over
430
431=item ->new()
432
433Creates a new empty queue.
434
435=item ->new(LIST)
436
437Creates a new queue pre-populated with the provided list of items.
438
439=back
440
441=head1 BASIC METHODS
442
443The following methods deal with queues on a FIFO basis.
444
445=over
446
447=item ->enqueue(LIST)
448
449Adds a list of items onto the end of the queue.
450
451=item ->dequeue()
452
453=item ->dequeue(COUNT)
454
455Removes the requested number of items (default is 1) from the head of the
456queue, and returns them.  If the queue contains fewer than the requested
457number of items, then the thread will be blocked until the requisite number
458of items are available (i.e., until other threads C<enqueue> more items).
459
460=item ->dequeue_nb()
461
462=item ->dequeue_nb(COUNT)
463
464Removes the requested number of items (default is 1) from the head of the
465queue, and returns them.  If the queue contains fewer than the requested
466number of items, then it immediately (i.e., non-blocking) returns whatever
467items there are on the queue.  If the queue is empty, then C<undef> is
468returned.
469
470=item ->dequeue_timed(TIMEOUT)
471
472=item ->dequeue_timed(TIMEOUT, COUNT)
473
474Removes the requested number of items (default is 1) from the head of the
475queue, and returns them.  If the queue contains fewer than the requested
476number of items, then the thread will be blocked until the requisite number of
477items are available, or until the timeout is reached.  If the timeout is
478reached, it returns whatever items there are on the queue, or C<undef> if the
479queue is empty.
480
481The timeout may be a number of seconds relative to the current time (e.g., 5
482seconds from when the call is made), or may be an absolute timeout in I<epoch>
483seconds the same as would be used with
484L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
485Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
486the underlying implementation).
487
488If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call
489behaves the same as C<dequeue_nb>.
490
491=item ->pending()
492
493Returns the number of items still in the queue.  Returns C<undef> if the queue
494has been ended (see below), and there are no more items in the queue.
495
496=item ->limit
497
498Sets the size of the queue.  If set, calls to C<enqueue()> will block until
499the number of pending items in the queue drops below the C<limit>.  The
500C<limit> does not prevent enqueuing items beyond that count:
501
502 my $q = Thread::Queue->new(1, 2);
503 $q->limit = 4;
504 $q->enqueue(3, 4, 5);   # Does not block
505 $q->enqueue(6);         # Blocks until at least 2 items are
506                         # dequeued
507 my $size = $q->limit;   # Returns the current limit (may return
508                         # 'undef')
509 $q->limit = 0;          # Queue size is now unlimited
510
511Calling any of the dequeue methods with C<COUNT> greater than a queue's
512C<limit> will generate an error.
513
514=item ->end()
515
516Declares that no more items will be added to the queue.
517
518All threads blocking on C<dequeue()> calls will be unblocked with any
519remaining items in the queue and/or C<undef> being returned.  Any subsequent
520calls to C<dequeue()> will behave like C<dequeue_nb()>.
521
522Once ended, no more items may be placed in the queue.
523
524=back
525
526=head1 ADVANCED METHODS
527
528The following methods can be used to manipulate items anywhere in a queue.
529
530To prevent the contents of a queue from being modified by another thread
531while it is being examined and/or changed, L<lock|threads::shared/"lock
532VARIABLE"> the queue inside a local block:
533
534 {
535     lock($q);   # Keep other threads from changing the queue's contents
536     my $item = $q->peek();
537     if ($item ...) {
538         ...
539     }
540 }
541 # Queue is now unlocked
542
543=over
544
545=item ->peek()
546
547=item ->peek(INDEX)
548
549Returns an item from the queue without dequeuing anything.  Defaults to the
550head of queue (at index position 0) if no index is specified.  Negative
551index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
552is the end of the queue, -2 is next to last, and so on).
553
554If no items exists at the specified index (i.e., the queue is empty, or the
555index is beyond the number of items on the queue), then C<undef> is returned.
556
557Remember, the returned item is not removed from the queue, so manipulating a
558C<peek>ed at reference affects the item on the queue.
559
560=item ->insert(INDEX, LIST)
561
562Adds the list of items to the queue at the specified index position (0
563is the head of the list).  Any existing items at and beyond that position are
564pushed back past the newly added items:
565
566 $q->enqueue(1, 2, 3, 4);
567 $q->insert(1, qw/foo bar/);
568 # Queue now contains:  1, foo, bar, 2, 3, 4
569
570Specifying an index position greater than the number of items in the queue
571just adds the list to the end.
572
573Negative index positions are supported:
574
575 $q->enqueue(1, 2, 3, 4);
576 $q->insert(-2, qw/foo bar/);
577 # Queue now contains:  1, 2, foo, bar, 3, 4
578
579Specifying a negative index position greater than the number of items in the
580queue adds the list to the head of the queue.
581
582=item ->extract()
583
584=item ->extract(INDEX)
585
586=item ->extract(INDEX, COUNT)
587
588Removes and returns the specified number of items (defaults to 1) from the
589specified index position in the queue (0 is the head of the queue).  When
590called with no arguments, C<extract> operates the same as C<dequeue_nb>.
591
592This method is non-blocking, and will return only as many items as are
593available to fulfill the request:
594
595 $q->enqueue(1, 2, 3, 4);
596 my $item  = $q->extract(2)     # Returns 3
597                                # Queue now contains:  1, 2, 4
598 my @items = $q->extract(1, 3)  # Returns (2, 4)
599                                # Queue now contains:  1
600
601Specifying an index position greater than the number of items in the
602queue results in C<undef> or an empty list being returned.
603
604 $q->enqueue('foo');
605 my $nada = $q->extract(3)      # Returns undef
606 my @nada = $q->extract(1, 3)   # Returns ()
607
608Negative index positions are supported.  Specifying a negative index position
609greater than the number of items in the queue may return items from the head
610of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
611queue from the specified position (i.e. if queue size + index + count is
612greater than zero):
613
614 $q->enqueue(qw/foo bar baz/);
615 my @nada = $q->extract(-6, 2);  # Returns ()      - (3+(-6)+2) <= 0
616 my @some = $q->extract(-6, 4);  # Returns (foo)   - (3+(-6)+4) > 0
617                                 # Queue now contains:  bar, baz
618 my @rest = $q->extract(-3, 4);  # Returns (bar, baz) -
619                                 #                   (2+(-3)+4) > 0
620
621=back
622
623=head1 NOTES
624
625Queues created by L<Thread::Queue> can be used in both threaded and
626non-threaded applications.
627
628=head1 LIMITATIONS
629
630Passing objects on queues may not work if the objects' classes do not support
631sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
632
633Passing array/hash refs that contain objects may not work for Perl prior to
6345.10.0.
635
636=head1 SEE ALSO
637
638Thread::Queue on MetaCPAN:
639L<https://metacpan.org/release/Thread-Queue>
640
641Code repository for CPAN distribution:
642L<https://github.com/Dual-Life/Thread-Queue>
643
644L<threads>, L<threads::shared>
645
646Sample code in the I<examples> directory of this distribution on CPAN.
647
648=head1 MAINTAINER
649
650Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
651
652=head1 LICENSE
653
654This program is free software; you can redistribute it and/or modify it under
655the same terms as Perl itself.
656
657=cut
658