1package Thread::Queue; 2 3use strict; 4use warnings; 5 6our $VERSION = '3.14'; # remember to update version in POD! 7$VERSION = eval $VERSION; 8 9use threads::shared 1.21; 10use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); 11 12# Carp errors from threads::shared calls should complain about caller 13our @CARP_NOT = ("threads::shared"); 14 15# Create a new queue possibly pre-populated with items 16sub new 17{ 18 my $class = shift; 19 my @queue :shared = map { shared_clone($_) } @_; 20 my %self :shared = ( 'queue' => \@queue ); 21 return bless(\%self, $class); 22} 23 24# Add items to the tail of a queue 25sub enqueue 26{ 27 my $self = shift; 28 lock(%$self); 29 30 if ($$self{'ENDED'}) { 31 require Carp; 32 Carp::croak("'enqueue' method called on queue that has been 'end'ed"); 33 } 34 35 # Block if queue size exceeds any specified limit 36 my $queue = $$self{'queue'}; 37 cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'})); 38 39 # Add items to queue, and then signal other threads 40 push(@$queue, map { shared_clone($_) } @_) 41 and cond_signal(%$self); 42} 43 44# Set or return the max. size for a queue 45sub limit : lvalue 46{ 47 my $self = shift; 48 lock(%$self); 49 $$self{'LIMIT'}; 50} 51 52# Return a count of the number of items on a queue 53sub pending 54{ 55 my $self = shift; 56 lock(%$self); 57 return if ($$self{'ENDED'} && ! @{$$self{'queue'}}); 58 return scalar(@{$$self{'queue'}}); 59} 60 61# Indicate that no more data will enter the queue 62sub end 63{ 64 my $self = shift; 65 lock(%$self); 66 # No more data is coming 67 $$self{'ENDED'} = 1; 68 69 cond_signal(%$self); # Unblock possibly waiting threads 70} 71 72# Return 1 or more items from the head of a queue, blocking if needed 73sub dequeue 74{ 75 my $self = shift; 76 lock(%$self); 77 my $queue = $$self{'queue'}; 78 79 my $count = @_ ? $self->_validate_count(shift) : 1; 80 81 # Wait for requisite number of items 82 cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'}); 83 84 # If no longer blocking, try getting whatever is left on the queue 85 return $self->dequeue_nb($count) if ($$self{'ENDED'}); 86 87 # Return single item 88 if ($count == 1) { 89 my $item = shift(@$queue); 90 cond_signal(%$self); # Unblock possibly waiting threads 91 return $item; 92 } 93 94 # Return multiple items 95 my @items; 96 push(@items, shift(@$queue)) for (1..$count); 97 cond_signal(%$self); # Unblock possibly waiting threads 98 return @items; 99} 100 101# Return items from the head of a queue with no blocking 102sub dequeue_nb 103{ 104 my $self = shift; 105 lock(%$self); 106 my $queue = $$self{'queue'}; 107 108 my $count = @_ ? $self->_validate_count(shift) : 1; 109 110 # Return single item 111 if ($count == 1) { 112 my $item = shift(@$queue); 113 cond_signal(%$self); # Unblock possibly waiting threads 114 return $item; 115 } 116 117 # Return multiple items 118 my @items; 119 for (1..$count) { 120 last if (! @$queue); 121 push(@items, shift(@$queue)); 122 } 123 cond_signal(%$self); # Unblock possibly waiting threads 124 return @items; 125} 126 127# Return items from the head of a queue, blocking if needed up to a timeout 128sub dequeue_timed 129{ 130 my $self = shift; 131 lock(%$self); 132 my $queue = $$self{'queue'}; 133 134 # Timeout may be relative or absolute 135 my $timeout = @_ ? $self->_validate_timeout(shift) : -1; 136 # Convert to an absolute time for use with cond_timedwait() 137 if ($timeout < 32000000) { # More than one year 138 $timeout += time(); 139 } 140 141 my $count = @_ ? $self->_validate_count(shift) : 1; 142 143 # Wait for requisite number of items, or until timeout 144 while ((@$queue < $count) && ! $$self{'ENDED'}) { 145 last if (! cond_timedwait(%$self, $timeout)); 146 } 147 148 # Get whatever we need off the queue if available 149 return $self->dequeue_nb($count); 150} 151 152# Return an item without removing it from a queue 153sub peek 154{ 155 my $self = shift; 156 lock(%$self); 157 my $index = @_ ? $self->_validate_index(shift) : 0; 158 return $$self{'queue'}[$index]; 159} 160 161# Insert items anywhere into a queue 162sub insert 163{ 164 my $self = shift; 165 lock(%$self); 166 167 if ($$self{'ENDED'}) { 168 require Carp; 169 Carp::croak("'insert' method called on queue that has been 'end'ed"); 170 } 171 172 my $queue = $$self{'queue'}; 173 174 my $index = $self->_validate_index(shift); 175 176 return if (! @_); # Nothing to insert 177 178 # Support negative indices 179 if ($index < 0) { 180 $index += @$queue; 181 if ($index < 0) { 182 $index = 0; 183 } 184 } 185 186 # Dequeue items from $index onward 187 my @tmp; 188 while (@$queue > $index) { 189 unshift(@tmp, pop(@$queue)) 190 } 191 192 # Add new items to the queue 193 push(@$queue, map { shared_clone($_) } @_); 194 195 # Add previous items back onto the queue 196 push(@$queue, @tmp); 197 198 cond_signal(%$self); # Unblock possibly waiting threads 199} 200 201# Remove items from anywhere in a queue 202sub extract 203{ 204 my $self = shift; 205 lock(%$self); 206 my $queue = $$self{'queue'}; 207 208 my $index = @_ ? $self->_validate_index(shift) : 0; 209 my $count = @_ ? $self->_validate_count(shift) : 1; 210 211 # Support negative indices 212 if ($index < 0) { 213 $index += @$queue; 214 if ($index < 0) { 215 $count += $index; 216 return if ($count <= 0); # Beyond the head of the queue 217 return $self->dequeue_nb($count); # Extract from the head 218 } 219 } 220 221 # Dequeue items from $index+$count onward 222 my @tmp; 223 while (@$queue > ($index+$count)) { 224 unshift(@tmp, pop(@$queue)) 225 } 226 227 # Extract desired items 228 my @items; 229 unshift(@items, pop(@$queue)) while (@$queue > $index); 230 231 # Add back any removed items 232 push(@$queue, @tmp); 233 234 cond_signal(%$self); # Unblock possibly waiting threads 235 236 # Return single item 237 return $items[0] if ($count == 1); 238 239 # Return multiple items 240 return @items; 241} 242 243### Internal Methods ### 244 245# Check value of the requested index 246sub _validate_index 247{ 248 my $self = shift; 249 my $index = shift; 250 251 if (! defined($index) || 252 ! looks_like_number($index) || 253 (int($index) != $index)) 254 { 255 require Carp; 256 my ($method) = (caller(1))[3]; 257 my $class_name = ref($self); 258 $method =~ s/$class_name\:://; 259 $index = 'undef' if (! defined($index)); 260 Carp::croak("Invalid 'index' argument ($index) to '$method' method"); 261 } 262 263 return $index; 264}; 265 266# Check value of the requested count 267sub _validate_count 268{ 269 my $self = shift; 270 my $count = shift; 271 272 if (! defined($count) || 273 ! looks_like_number($count) || 274 (int($count) != $count) || 275 ($count < 1) || 276 ($$self{'LIMIT'} && $count > $$self{'LIMIT'})) 277 { 278 require Carp; 279 my ($method) = (caller(1))[3]; 280 my $class_name = ref($self); 281 $method =~ s/$class_name\:://; 282 $count = 'undef' if (! defined($count)); 283 if ($$self{'LIMIT'} && $count > $$self{'LIMIT'}) { 284 Carp::croak("'count' argument ($count) to '$method' method exceeds queue size limit ($$self{'LIMIT'})"); 285 } else { 286 Carp::croak("Invalid 'count' argument ($count) to '$method' method"); 287 } 288 } 289 290 return $count; 291}; 292 293# Check value of the requested timeout 294sub _validate_timeout 295{ 296 my $self = shift; 297 my $timeout = shift; 298 299 if (! defined($timeout) || 300 ! looks_like_number($timeout)) 301 { 302 require Carp; 303 my ($method) = (caller(1))[3]; 304 my $class_name = ref($self); 305 $method =~ s/$class_name\:://; 306 $timeout = 'undef' if (! defined($timeout)); 307 Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method"); 308 } 309 310 return $timeout; 311}; 312 3131; 314 315=head1 NAME 316 317Thread::Queue - Thread-safe queues 318 319=head1 VERSION 320 321This document describes Thread::Queue version 3.14 322 323=head1 SYNOPSIS 324 325 use strict; 326 use warnings; 327 328 use threads; 329 use Thread::Queue; 330 331 my $q = Thread::Queue->new(); # A new empty queue 332 333 # Worker thread 334 my $thr = threads->create( 335 sub { 336 # Thread will loop until no more work 337 while (defined(my $item = $q->dequeue())) { 338 # Do work on $item 339 ... 340 } 341 } 342 ); 343 344 # Send work to the thread 345 $q->enqueue($item1, ...); 346 # Signal that there is no more work to be sent 347 $q->end(); 348 # Join up with the thread when it finishes 349 $thr->join(); 350 351 ... 352 353 # Count of items in the queue 354 my $left = $q->pending(); 355 356 # Non-blocking dequeue 357 if (defined(my $item = $q->dequeue_nb())) { 358 # Work on $item 359 } 360 361 # Blocking dequeue with 5-second timeout 362 if (defined(my $item = $q->dequeue_timed(5))) { 363 # Work on $item 364 } 365 366 # Set a size for a queue 367 $q->limit = 5; 368 369 # Get the second item in the queue without dequeuing anything 370 my $item = $q->peek(1); 371 372 # Insert two items into the queue just behind the head 373 $q->insert(1, $item1, $item2); 374 375 # Extract the last two items on the queue 376 my ($item1, $item2) = $q->extract(-2, 2); 377 378=head1 DESCRIPTION 379 380This module provides thread-safe FIFO queues that can be accessed safely by 381any number of threads. 382 383Any data types supported by L<threads::shared> can be passed via queues: 384 385=over 386 387=item Ordinary scalars 388 389=item Array refs 390 391=item Hash refs 392 393=item Scalar refs 394 395=item Objects based on the above 396 397=back 398 399Ordinary scalars are added to queues as they are. 400 401If not already thread-shared, the other complex data types will be cloned 402(recursively, if needed, and including any C<bless>ings and read-only 403settings) into thread-shared structures before being placed onto a queue. 404 405For example, the following would cause L<Thread::Queue> to create a empty, 406shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' 407and 'baz' from C<@ary> into it, and then place that shared reference onto 408the queue: 409 410 my @ary = qw/foo bar baz/; 411 $q->enqueue(\@ary); 412 413However, for the following, the items are already shared, so their references 414are added directly to the queue, and no cloning takes place: 415 416 my @ary :shared = qw/foo bar baz/; 417 $q->enqueue(\@ary); 418 419 my $obj = &shared({}); 420 $$obj{'foo'} = 'bar'; 421 $$obj{'qux'} = 99; 422 bless($obj, 'My::Class'); 423 $q->enqueue($obj); 424 425See L</"LIMITATIONS"> for caveats related to passing objects via queues. 426 427=head1 QUEUE CREATION 428 429=over 430 431=item ->new() 432 433Creates a new empty queue. 434 435=item ->new(LIST) 436 437Creates a new queue pre-populated with the provided list of items. 438 439=back 440 441=head1 BASIC METHODS 442 443The following methods deal with queues on a FIFO basis. 444 445=over 446 447=item ->enqueue(LIST) 448 449Adds a list of items onto the end of the queue. 450 451=item ->dequeue() 452 453=item ->dequeue(COUNT) 454 455Removes the requested number of items (default is 1) from the head of the 456queue, and returns them. If the queue contains fewer than the requested 457number of items, then the thread will be blocked until the requisite number 458of items are available (i.e., until other threads C<enqueue> more items). 459 460=item ->dequeue_nb() 461 462=item ->dequeue_nb(COUNT) 463 464Removes the requested number of items (default is 1) from the head of the 465queue, and returns them. If the queue contains fewer than the requested 466number of items, then it immediately (i.e., non-blocking) returns whatever 467items there are on the queue. If the queue is empty, then C<undef> is 468returned. 469 470=item ->dequeue_timed(TIMEOUT) 471 472=item ->dequeue_timed(TIMEOUT, COUNT) 473 474Removes the requested number of items (default is 1) from the head of the 475queue, and returns them. If the queue contains fewer than the requested 476number of items, then the thread will be blocked until the requisite number of 477items are available, or until the timeout is reached. If the timeout is 478reached, it returns whatever items there are on the queue, or C<undef> if the 479queue is empty. 480 481The timeout may be a number of seconds relative to the current time (e.g., 5 482seconds from when the call is made), or may be an absolute timeout in I<epoch> 483seconds the same as would be used with 484L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">. 485Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of 486the underlying implementation). 487 488If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call 489behaves the same as C<dequeue_nb>. 490 491=item ->pending() 492 493Returns the number of items still in the queue. Returns C<undef> if the queue 494has been ended (see below), and there are no more items in the queue. 495 496=item ->limit 497 498Sets the size of the queue. If set, calls to C<enqueue()> will block until 499the number of pending items in the queue drops below the C<limit>. The 500C<limit> does not prevent enqueuing items beyond that count: 501 502 my $q = Thread::Queue->new(1, 2); 503 $q->limit = 4; 504 $q->enqueue(3, 4, 5); # Does not block 505 $q->enqueue(6); # Blocks until at least 2 items are 506 # dequeued 507 my $size = $q->limit; # Returns the current limit (may return 508 # 'undef') 509 $q->limit = 0; # Queue size is now unlimited 510 511Calling any of the dequeue methods with C<COUNT> greater than a queue's 512C<limit> will generate an error. 513 514=item ->end() 515 516Declares that no more items will be added to the queue. 517 518All threads blocking on C<dequeue()> calls will be unblocked with any 519remaining items in the queue and/or C<undef> being returned. Any subsequent 520calls to C<dequeue()> will behave like C<dequeue_nb()>. 521 522Once ended, no more items may be placed in the queue. 523 524=back 525 526=head1 ADVANCED METHODS 527 528The following methods can be used to manipulate items anywhere in a queue. 529 530To prevent the contents of a queue from being modified by another thread 531while it is being examined and/or changed, L<lock|threads::shared/"lock 532VARIABLE"> the queue inside a local block: 533 534 { 535 lock($q); # Keep other threads from changing the queue's contents 536 my $item = $q->peek(); 537 if ($item ...) { 538 ... 539 } 540 } 541 # Queue is now unlocked 542 543=over 544 545=item ->peek() 546 547=item ->peek(INDEX) 548 549Returns an item from the queue without dequeuing anything. Defaults to the 550head of queue (at index position 0) if no index is specified. Negative 551index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 552is the end of the queue, -2 is next to last, and so on). 553 554If no items exists at the specified index (i.e., the queue is empty, or the 555index is beyond the number of items on the queue), then C<undef> is returned. 556 557Remember, the returned item is not removed from the queue, so manipulating a 558C<peek>ed at reference affects the item on the queue. 559 560=item ->insert(INDEX, LIST) 561 562Adds the list of items to the queue at the specified index position (0 563is the head of the list). Any existing items at and beyond that position are 564pushed back past the newly added items: 565 566 $q->enqueue(1, 2, 3, 4); 567 $q->insert(1, qw/foo bar/); 568 # Queue now contains: 1, foo, bar, 2, 3, 4 569 570Specifying an index position greater than the number of items in the queue 571just adds the list to the end. 572 573Negative index positions are supported: 574 575 $q->enqueue(1, 2, 3, 4); 576 $q->insert(-2, qw/foo bar/); 577 # Queue now contains: 1, 2, foo, bar, 3, 4 578 579Specifying a negative index position greater than the number of items in the 580queue adds the list to the head of the queue. 581 582=item ->extract() 583 584=item ->extract(INDEX) 585 586=item ->extract(INDEX, COUNT) 587 588Removes and returns the specified number of items (defaults to 1) from the 589specified index position in the queue (0 is the head of the queue). When 590called with no arguments, C<extract> operates the same as C<dequeue_nb>. 591 592This method is non-blocking, and will return only as many items as are 593available to fulfill the request: 594 595 $q->enqueue(1, 2, 3, 4); 596 my $item = $q->extract(2) # Returns 3 597 # Queue now contains: 1, 2, 4 598 my @items = $q->extract(1, 3) # Returns (2, 4) 599 # Queue now contains: 1 600 601Specifying an index position greater than the number of items in the 602queue results in C<undef> or an empty list being returned. 603 604 $q->enqueue('foo'); 605 my $nada = $q->extract(3) # Returns undef 606 my @nada = $q->extract(1, 3) # Returns () 607 608Negative index positions are supported. Specifying a negative index position 609greater than the number of items in the queue may return items from the head 610of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the 611queue from the specified position (i.e. if queue size + index + count is 612greater than zero): 613 614 $q->enqueue(qw/foo bar baz/); 615 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 616 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 617 # Queue now contains: bar, baz 618 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - 619 # (2+(-3)+4) > 0 620 621=back 622 623=head1 NOTES 624 625Queues created by L<Thread::Queue> can be used in both threaded and 626non-threaded applications. 627 628=head1 LIMITATIONS 629 630Passing objects on queues may not work if the objects' classes do not support 631sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. 632 633Passing array/hash refs that contain objects may not work for Perl prior to 6345.10.0. 635 636=head1 SEE ALSO 637 638Thread::Queue on MetaCPAN: 639L<https://metacpan.org/release/Thread-Queue> 640 641Code repository for CPAN distribution: 642L<https://github.com/Dual-Life/Thread-Queue> 643 644L<threads>, L<threads::shared> 645 646Sample code in the I<examples> directory of this distribution on CPAN. 647 648=head1 MAINTAINER 649 650Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> 651 652=head1 LICENSE 653 654This program is free software; you can redistribute it and/or modify it under 655the same terms as Perl itself. 656 657=cut 658