1############################################################################### 2## ---------------------------------------------------------------------------- 3## Hybrid-queue helper class. 4## 5############################################################################### 6 7package MCE::Shared::Queue; 8 9use strict; 10use warnings; 11 12use 5.010001; 13 14no warnings qw( threads recursion uninitialized numeric ); 15 16our $VERSION = '1.874'; 17 18## no critic (Subroutines::ProhibitExplicitReturnUndef) 19 20use Scalar::Util qw( looks_like_number ); 21use MCE::Shared::Base (); 22use MCE::Util (); 23use MCE::Mutex (); 24 25use overload ( 26 q("") => \&MCE::Shared::Base::_stringify, 27 q(0+) => \&MCE::Shared::Base::_numify, 28 fallback => 1 29); 30 31############################################################################### 32## ---------------------------------------------------------------------------- 33## Attributes used internally. 34## _qr_sock _qw_sock _datp _datq _dsem _heap _init_pid _porder _type 35## _ar_sock _aw_sock _asem _tsem 36## 37############################################################################### 38 39our ($HIGHEST, $LOWEST, $FIFO, $LIFO, $LILO, $FILO) = (1, 0, 1, 0, 1, 0); 40my ($PORDER, $TYPE, $AWAIT) = ($HIGHEST, $FIFO, 0); 41 42my $LF = "\012"; Internals::SvREADONLY($LF, 1); 43my $_tid = $INC{'threads.pm'} ? threads->tid() : 0; 44my $_reset_flg = 1; 45 46my %_valid_fields_new = map { $_ => 1 } qw( 47 await barrier fast porder queue type 48); 49 50sub _croak { 51 goto &MCE::Shared::Base::_croak; 52} 53sub CLONE { 54 $_tid = threads->tid() if $INC{'threads.pm'}; 55} 56 57sub DESTROY { 58 my ($_Q) = @_; 59 my $_pid = $_tid ? $$ .'.'. $_tid : $$; 60 61 undef $_Q->{_datp}, undef $_Q->{_datq}, undef $_Q->{_heap}; 62 63 if ($_Q->{_init_pid} eq $_pid) { 64 MCE::Util::_destroy_socks($_Q, qw(_aw_sock _ar_sock _qw_sock _qr_sock)); 65 } 66 67 return; 68} 69 70############################################################################### 71## ---------------------------------------------------------------------------- 72## Instance instantiation. 73## 74############################################################################### 75 76# new ( options ) 77 78sub new { 79 my ($_class, %_argv) = @_; 80 my $_Q = {}; bless($_Q, ref($_class) || $_class); 81 82 for my $_p (keys %_argv) { 83 _croak("Queue: ($_p) is not a valid constructor argument") 84 unless (exists $_valid_fields_new{$_p}); 85 } 86 87 $_Q->{_asem} = 0; # Semaphore count variable for the ->await method 88 $_Q->{_datp} = {}; # Priority data { p1 => [ ], p2 => [ ], pN => [ ] } 89 $_Q->{_heap} = []; # Priority heap [ pN, p2, p1 ] in heap order 90 # fyi, _datp will always dequeue before _datq 91 92 # -------------------------------------------------------------------------- 93 94 $_Q->{_await} = defined $_argv{await} ? $_argv{await} : $AWAIT; 95 $_Q->{_porder} = defined $_argv{porder} ? $_argv{porder} : $PORDER; 96 $_Q->{_type} = defined $_argv{type} ? $_argv{type} : $TYPE; 97 98 if (exists $_argv{queue}) { 99 _croak('Queue: (queue) is not an ARRAY reference') 100 if (ref $_argv{queue} ne 'ARRAY'); 101 $_Q->{_datq} = $_argv{queue}; 102 } 103 else { 104 $_Q->{_datq} = []; 105 } 106 107 # -------------------------------------------------------------------------- 108 109 $_Q->{_init_pid} = $_tid ? $$ .'.'. $_tid : $$; 110 $_Q->{_dsem} = 0; 111 112 MCE::Util::_sock_pair($_Q, qw(_qr_sock _qw_sock), undef, 1); 113 MCE::Util::_sock_pair($_Q, qw(_ar_sock _aw_sock), undef, 1) if $_Q->{_await}; 114 115 MCE::Shared::Object::_reset(), $_reset_flg = '' 116 if ($_reset_flg && $INC{'MCE/Shared/Server.pm'}); 117 118 return $_Q; 119} 120 121############################################################################### 122## ---------------------------------------------------------------------------- 123## Public methods. 124## 125############################################################################### 126 127# await ( pending_threshold ) 128 129sub await { 130 # Handled by MCE::Shared::Object when shared. 131 return; 132} 133 134# clear ( ) 135 136sub clear { 137 my ($_Q) = @_; 138 139 %{ $_Q->{_datp} } = (); 140 @{ $_Q->{_datq} } = (); 141 @{ $_Q->{_heap} } = (); 142 143 return; 144} 145 146# end ( ) 147 148sub end { 149 my ($_Q) = @_; 150 151 if (!exists $_Q->{_ended}) { 152 for my $_i (1 .. $_Q->{_dsem}) { syswrite($_Q->{_qw_sock}, $LF) } 153 $_Q->{_dsem} = 0, $_Q->{_ended} = undef; 154 } 155 156 return; 157} 158 159# enqueue ( item [, item, ... ] ) 160 161sub enqueue { 162 my $_Q = shift; 163 164 return unless (scalar @_); 165 166 if (exists $_Q->{_ended}) { 167 warn "Queue: (enqueue) called on queue that has been 'end'ed\n"; 168 return; 169 } 170 171 if ($_Q->{_dsem}) { 172 for my $_i (1 .. scalar @_) { 173 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF); 174 last unless $_Q->{_dsem}; 175 } 176 } 177 178 push @{ $_Q->{_datq} }, @_; 179 180 return; 181} 182 183# enqueuep ( priority, item [, item, ... ] ) 184 185sub enqueuep { 186 my ($_Q, $_p) = (shift, shift); 187 188 _croak('Queue: (enqueuep priority) is not an integer') 189 if (!looks_like_number($_p) || int($_p) != $_p); 190 191 return unless (scalar @_); 192 193 if (exists $_Q->{_ended}) { 194 warn "Queue: (enqueuep) called on queue that has been 'end'ed\n"; 195 return; 196 } 197 198 if ($_Q->{_dsem}) { 199 for my $_i (1 .. scalar @_) { 200 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF); 201 last unless $_Q->{_dsem}; 202 } 203 } 204 205 $_Q->_enqueuep($_p, @_); 206 207 return; 208} 209 210# dequeue ( count ) 211# dequeue ( ) 212 213sub dequeue { 214 my ($_Q, $_cnt) = @_; 215 my (@_items, $_has_data, $_buf); 216 217 if (defined $_cnt && $_cnt ne '1') { 218 _croak('Queue: (dequeue count argument) is not valid') 219 if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1); 220 221 my $_pending = @{ $_Q->{_datq} }; 222 223 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) { 224 for my $_h (@{ $_Q->{_heap} }) { 225 $_pending += @{ $_Q->{_datp}->{$_h} }; 226 } 227 } 228 $_cnt = $_pending if $_pending < $_cnt; 229 230 for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() } 231 } 232 else { 233 $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0; 234 $_buf = $_Q->_dequeue(); 235 } 236 237 return @_items if (scalar @_items); 238 return $_buf if ($_has_data); 239 return () if (exists $_Q->{_ended}); 240 241 $_Q->{_dsem} += 1, MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1); 242 243 goto \&dequeue; 244} 245 246# dequeue_nb ( count ) 247# dequeue_nb ( ) 248 249sub dequeue_nb { 250 my ($_Q, $_cnt) = @_; 251 252 if (defined $_cnt && $_cnt ne '1') { 253 _croak('Queue: (dequeue count argument) is not valid') 254 if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1); 255 256 my $_pending = @{ $_Q->{_datq} }; 257 258 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) { 259 for my $_h (@{ $_Q->{_heap} }) { 260 $_pending += @{ $_Q->{_datp}->{$_h} }; 261 } 262 } 263 264 $_cnt = $_pending if $_pending < $_cnt; 265 266 return map { $_Q->_dequeue() } 1 .. $_cnt; 267 } 268 269 my $_buf = $_Q->_dequeue(); 270 271 return defined($_buf) ? $_buf : (); 272} 273 274# pending ( ) 275 276sub pending { 277 my ($_Q) = @_; 278 my $_pending = @{ $_Q->{_datq} }; 279 280 if (scalar @{ $_Q->{_heap} }) { 281 for my $_h (@{ $_Q->{_heap} }) { 282 $_pending += @{ $_Q->{_datp}->{$_h} }; 283 } 284 } 285 286 return (exists $_Q->{_ended}) 287 ? $_pending ? $_pending : undef 288 : $_pending; 289} 290 291# insert ( index, item [, item, ... ] ) 292 293sub insert { 294 my ($_Q, $_i) = (shift, shift); 295 296 _croak('Queue: (insert index) is not an integer') 297 if (!looks_like_number($_i) || int($_i) != $_i); 298 299 return unless (scalar @_); 300 301 if (exists $_Q->{_ended}) { 302 warn "Queue: (insert) called on queue that has been 'end'ed\n"; 303 return; 304 } 305 306 if ($_Q->{_dsem}) { 307 for my $_i (1 .. scalar @_) { 308 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF); 309 last unless $_Q->{_dsem}; 310 } 311 } 312 313 if (abs($_i) > scalar @{ $_Q->{_datq} }) { 314 if ($_i >= 0) { 315 if ($_Q->{_type}) { 316 push @{ $_Q->{_datq} }, @_; 317 } else { 318 unshift @{ $_Q->{_datq} }, @_; 319 } 320 } 321 else { 322 if ($_Q->{_type}) { 323 unshift @{ $_Q->{_datq} }, @_; 324 } else { 325 push @{ $_Q->{_datq} }, @_; 326 } 327 } 328 } 329 else { 330 if (!$_Q->{_type}) { 331 $_i = ($_i >= 0) 332 ? scalar(@{ $_Q->{_datq} }) - $_i 333 : abs($_i); 334 } 335 splice @{ $_Q->{_datq} }, $_i, 0, @_; 336 } 337 338 return; 339} 340 341# insertp ( priority, index, item [, item, ... ] ) 342 343sub insertp { 344 my ($_Q, $_p, $_i) = (shift, shift, shift); 345 346 _croak('Queue: (insertp priority) is not an integer') 347 if (!looks_like_number($_p) || int($_p) != $_p); 348 _croak('Queue: (insertp index) is not an integer') 349 if (!looks_like_number($_i) || int($_i) != $_i); 350 351 return unless (scalar @_); 352 353 if (exists $_Q->{_ended}) { 354 warn "Queue: (insertp) called on queue that has been 'end'ed\n"; 355 return; 356 } 357 358 if ($_Q->{_dsem}) { 359 for my $_i (1 .. scalar @_) { 360 $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF); 361 last unless $_Q->{_dsem}; 362 } 363 } 364 365 if (exists $_Q->{_datp}->{$_p} && scalar @{ $_Q->{_datp}->{$_p} }) { 366 367 if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} }) { 368 if ($_i >= 0) { 369 if ($_Q->{_type}) { 370 push @{ $_Q->{_datp}->{$_p} }, @_; 371 } else { 372 unshift @{ $_Q->{_datp}->{$_p} }, @_; 373 } 374 } 375 else { 376 if ($_Q->{_type}) { 377 unshift @{ $_Q->{_datp}->{$_p} }, @_; 378 } else { 379 push @{ $_Q->{_datp}->{$_p} }, @_; 380 } 381 } 382 } 383 else { 384 if (!$_Q->{_type}) { 385 $_i = ($_i >=0) 386 ? scalar(@{ $_Q->{_datp}->{$_p} }) - $_i 387 : abs($_i); 388 } 389 splice @{ $_Q->{_datp}->{$_p} }, $_i, 0, @_; 390 } 391 } 392 else { 393 $_Q->_enqueuep($_p, @_); 394 } 395 396 return; 397} 398 399# peek ( index ) 400# peek ( ) 401 402sub peek { 403 my ($_Q, $_i) = @_; 404 405 if ($_i) { 406 _croak('Queue: (peek index) is not an integer') 407 if (!looks_like_number($_i) || int($_i) != $_i); 408 } 409 else { $_i = 0 } 410 411 return undef if (abs($_i) > scalar @{ $_Q->{_datq} }); 412 413 if (!$_Q->{_type}) { 414 $_i = ($_i >= 0) 415 ? scalar(@{ $_Q->{_datq} }) - ($_i + 1) 416 : abs($_i + 1); 417 } 418 419 return $_Q->{_datq}->[$_i]; 420} 421 422# peekp ( priority, index ) 423# peekp ( priority ) 424 425sub peekp { 426 my ($_Q, $_p, $_i) = @_; 427 428 if ($_i) { 429 _croak('Queue: (peekp index) is not an integer') 430 if (!looks_like_number($_i) || int($_i) != $_i); 431 } 432 else { $_i = 0 } 433 434 _croak('Queue: (peekp priority) is not an integer') 435 if (!looks_like_number($_p) || int($_p) != $_p); 436 437 return undef unless (exists $_Q->{_datp}->{$_p}); 438 return undef if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} }); 439 440 if (!$_Q->{_type}) { 441 $_i = ($_i >= 0) 442 ? scalar(@{ $_Q->{_datp}->{$_p} }) - ($_i + 1) 443 : abs($_i + 1); 444 } 445 446 return $_Q->{_datp}->{$_p}->[$_i]; 447} 448 449# peekh ( index ) 450# peekh ( ) 451 452sub peekh { 453 my ($_Q, $_i) = @_; 454 455 if ($_i) { 456 _croak('Queue: (peekh index) is not an integer') 457 if (!looks_like_number($_i) || int($_i) != $_i); 458 } 459 else { $_i = 0 } 460 461 return undef if (abs($_i) > scalar @{ $_Q->{_heap} }); 462 return $_Q->{_heap}->[$_i]; 463} 464 465# heap ( ) 466 467sub heap { 468 return @{ shift->{_heap} }; 469} 470 471############################################################################### 472## ---------------------------------------------------------------------------- 473## Private methods. 474## 475############################################################################### 476 477# Add items to the tail of the queue with priority level. 478 479sub _enqueuep { 480 my ($_Q, $_p) = (shift, shift); 481 482 # Enlist priority into the heap. 483 if (!exists $_Q->{_datp}->{$_p} || @{ $_Q->{_datp}->{$_p} } == 0) { 484 485 unless (scalar @{ $_Q->{_heap} }) { 486 push @{ $_Q->{_heap} }, $_p; 487 } 488 elsif ($_Q->{_porder}) { 489 $_Q->_heap_insert_high($_p); 490 } 491 else { 492 $_Q->_heap_insert_low($_p); 493 } 494 } 495 496 # Append item(s) into the queue. 497 push @{ $_Q->{_datp}->{$_p} }, @_; 498 499 return; 500} 501 502# Return one item from the queue. 503 504sub _dequeue { 505 my ($_Q) = @_; 506 507 # Return item from the non-priority queue. 508 unless (scalar @{ $_Q->{_heap} }) { 509 return ($_Q->{_type}) 510 ? shift @{ $_Q->{_datq} } : pop @{ $_Q->{_datq} }; 511 } 512 513 my $_p = $_Q->{_heap}->[0]; 514 515 # Delist priority from the heap when 1 item remains. 516 shift @{ $_Q->{_heap} } if (@{ $_Q->{_datp}->{$_p} } == 1); 517 518 # Return item from the priority queue. 519 return ($_Q->{_type}) 520 ? shift @{ $_Q->{_datp}->{$_p} } : pop @{ $_Q->{_datp}->{$_p} }; 521} 522 523# Helper method for getting the reference to the underlying array. 524# Use with test scripts for comparing data only (not a public API). 525 526sub _get_aref { 527 my ($_Q, $_p) = @_; 528 529 if (defined $_p) { 530 _croak('Queue: (get_aref priority) is not an integer') 531 if (!looks_like_number($_p) || int($_p) != $_p); 532 533 return undef unless (exists $_Q->{_datp}->{$_p}); 534 return $_Q->{_datp}->{$_p}; 535 } 536 537 return $_Q->{_datq}; 538} 539 540# Insert priority into the heap. A lower priority level comes first. 541 542sub _heap_insert_low { 543 my ($_Q, $_p) = @_; 544 545 # Insert priority at the head of the heap. 546 if ($_p < $_Q->{_heap}->[0]) { 547 unshift @{ $_Q->{_heap} }, $_p; 548 } 549 550 # Insert priority at the end of the heap. 551 elsif ($_p > $_Q->{_heap}->[-1]) { 552 push @{ $_Q->{_heap} }, $_p; 553 } 554 555 # Insert priority through binary search. 556 else { 557 my $_lower = 0; my $_upper = @{ $_Q->{_heap} }; 558 559 while ($_lower < $_upper) { 560 my $_midpoint = $_lower + (($_upper - $_lower) >> 1); 561 if ($_p > $_Q->{_heap}->[$_midpoint]) { 562 $_lower = $_midpoint + 1; 563 } else { 564 $_upper = $_midpoint; 565 } 566 } 567 568 # Insert priority into the heap. 569 splice @{ $_Q->{_heap} }, $_lower, 0, $_p; 570 } 571 572 return; 573} 574 575# Insert priority into the heap. A higher priority level comes first. 576 577sub _heap_insert_high { 578 my ($_Q, $_p) = @_; 579 580 # Insert priority at the head of the heap. 581 if ($_p > $_Q->{_heap}->[0]) { 582 unshift @{ $_Q->{_heap} }, $_p; 583 } 584 585 # Insert priority at the end of the heap. 586 elsif ($_p < $_Q->{_heap}->[-1]) { 587 push @{ $_Q->{_heap} }, $_p; 588 } 589 590 # Insert priority through binary search. 591 else { 592 my $_lower = 0; my $_upper = @{ $_Q->{_heap} }; 593 594 while ($_lower < $_upper) { 595 my $_midpoint = $_lower + (($_upper - $_lower) >> 1); 596 if ($_p < $_Q->{_heap}->[$_midpoint]) { 597 $_lower = $_midpoint + 1; 598 } else { 599 $_upper = $_midpoint; 600 } 601 } 602 603 # Insert priority into the heap. 604 splice @{ $_Q->{_heap} }, $_lower, 0, $_p; 605 } 606 607 return; 608} 609 610############################################################################### 611## ---------------------------------------------------------------------------- 612## Server functions. 613## 614############################################################################### 615 616{ 617 use bytes; 618 619 use constant { 620 SHR_O_QUA => 'O~QUA', # Queue await 621 SHR_O_QUD => 'O~QUD', # Queue dequeue 622 SHR_O_QUN => 'O~QUN', # Queue dequeue non-blocking 623 }; 624 625 my ( 626 $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_obj, $_freeze, $_thaw, 627 $_cnt, $_id, $_has_data, $_pending, $_t 628 ); 629 630 my %_output_function = ( 631 632 SHR_O_QUA.$LF => sub { # Queue await 633 $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF }; 634 635 chomp($_id = <$_DAU_R_SOCK>), 636 chomp($_t = <$_DAU_R_SOCK>); 637 638 my $_Q = $_obj->{ $_id } || do { 639 print {$_DAU_R_SOCK} $LF; 640 }; 641 $_Q->{_tsem} = $_t; 642 643 if ($_Q->pending() <= $_t) { 644 syswrite($_Q->{_aw_sock}, $LF); 645 } else { 646 $_Q->{_asem} += 1; 647 } 648 649 print {$_DAU_R_SOCK} $LF; 650 651 return; 652 }, 653 654 SHR_O_QUD.$LF => sub { # Queue dequeue 655 $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF }; 656 657 chomp($_id = <$_DAU_R_SOCK>), 658 chomp($_cnt = <$_DAU_R_SOCK>); 659 660 $_cnt = 0 if ($_cnt == 1); 661 662 my $_Q = $_obj->{ $_id } || do { 663 print {$_DAU_R_SOCK} '-1'.$LF; 664 return; 665 }; 666 667 my (@_items, $_buf); 668 669 if ($_cnt) { 670 $_pending = @{ $_Q->{_datq} }; 671 672 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) { 673 for my $_h (@{ $_Q->{_heap} }) { 674 $_pending += @{ $_Q->{_datp}->{$_h} }; 675 } 676 } 677 $_cnt = $_pending if $_pending < $_cnt; 678 679 for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() } 680 } 681 else { 682 $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0; 683 $_buf = $_Q->_dequeue(); 684 } 685 686 if ($_cnt) { 687 $_buf = $_freeze->(\@_items); 688 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf; 689 } 690 elsif ($_has_data) { 691 $_buf = $_freeze->([ $_buf ]); 692 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf; 693 } 694 elsif (exists $_Q->{_ended}) { 695 print {$_DAU_R_SOCK} '-2'.$LF; 696 } 697 else { 698 print {$_DAU_R_SOCK} '-1'.$LF; 699 $_Q->{_dsem} += 1; 700 } 701 702 if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) { 703 for my $_i (1 .. $_Q->{_asem}) { 704 syswrite($_Q->{_aw_sock}, $LF); 705 } 706 $_Q->{_asem} = 0; 707 } 708 709 return; 710 }, 711 712 SHR_O_QUN.$LF => sub { # Queue dequeue non-blocking 713 $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF }; 714 715 chomp($_id = <$_DAU_R_SOCK>), 716 chomp($_cnt = <$_DAU_R_SOCK>); 717 718 my $_Q = $_obj->{ $_id } || do { 719 print {$_DAU_R_SOCK} '-1'.$LF; 720 return; 721 }; 722 723 if ($_cnt == 1) { 724 my $_buf = $_Q->_dequeue(); 725 726 if (defined $_buf) { 727 $_buf = $_freeze->([ $_buf ]); 728 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf; 729 } 730 else { 731 print {$_DAU_R_SOCK} '-1'.$LF; 732 } 733 } 734 else { 735 my @_items; 736 my $_pending = @{ $_Q->{_datq} }; 737 738 if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) { 739 for my $_h (@{ $_Q->{_heap} }) { 740 $_pending += @{ $_Q->{_datp}->{$_h} }; 741 } 742 } 743 $_cnt = $_pending if $_pending < $_cnt; 744 745 for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() } 746 747 if ($_cnt) { 748 my $_buf = $_freeze->(\@_items); 749 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf; 750 } 751 else { 752 print {$_DAU_R_SOCK} '-1'.$LF; 753 } 754 } 755 756 if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) { 757 for my $_i (1 .. $_Q->{_asem}) { 758 syswrite($_Q->{_aw_sock}, $LF); 759 } 760 $_Q->{_asem} = 0; 761 } 762 763 return; 764 }, 765 766 ); 767 768 sub _init_mgr { 769 my $_function; 770 ( $_DAU_R_SOCK_REF, $_obj, $_function, $_freeze, $_thaw ) = @_; 771 772 for my $key ( keys %_output_function ) { 773 last if exists($_function->{$key}); 774 $_function->{$key} = $_output_function{$key}; 775 } 776 777 return; 778 } 779} 780 781############################################################################### 782## ---------------------------------------------------------------------------- 783## Object package. 784## 785############################################################################### 786 787## Items below are folded into MCE::Shared::Object. 788 789package # hide from rpm 790 MCE::Shared::Object; 791 792use strict; 793use warnings; 794 795no warnings qw( threads recursion uninitialized numeric once ); 796 797use bytes; 798 799no overloading; 800 801my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0; 802 803my ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj, 804 $_freeze, $_thaw); 805 806sub _init_queue { 807 ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj, 808 $_freeze, $_thaw) = @_; 809 810 return; 811} 812 813sub _req_queue { 814 local $\ = undef if (defined $\); 815 local $/ = $LF if ($/ ne $LF); 816 local $MCE::Signal::SIG; 817 818 { 819 local $MCE::Signal::IPC = 1; 820 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->(); 821 822 print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF), 823 print({$_DAU_W_SOCK} $_[1]); 824 chomp($_[2] = <$_DAU_W_SOCK>); 825 826 read($_DAU_W_SOCK, $_[3], $_[2]) if ($_[2] > 0); 827 828 $_dat_un->() if !$_is_MSWin32; 829 } 830 831 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG; 832} 833 834sub await { 835 my $_id = shift()->[0]; 836 return unless ( my $_Q = $_obj->{ $_id } ); 837 return unless ( exists $_Q->{_qr_sock} ); 838 839 my $_t = shift || 0; 840 841 _croak('Queue: (await) is not enabled for this queue') 842 unless (exists $_Q->{_ar_sock}); 843 _croak('Queue: (await threshold) is not an integer') 844 if (!looks_like_number($_t) || int($_t) != $_t); 845 846 $_t = 0 if ($_t < 0); 847 _req1('O~QUA', $_id.$LF . $_t.$LF); 848 849 MCE::Util::_sock_ready($_Q->{_ar_sock}) if $_is_MSWin32; 850 MCE::Util::_sysread($_Q->{_ar_sock}, my($_b), 1); 851 852 return; 853} 854 855sub dequeue { 856 my ($self, $_cnt) = @_; 857 my $_id = $self->[0]; 858 859 return unless ( my $_Q = $_obj->{ $_id } ); 860 return unless ( exists $_Q->{_qr_sock} ); 861 862 if (defined $_cnt && $_cnt ne '1') { 863 _croak('Queue: (dequeue count argument) is not valid') 864 if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1); 865 } 866 else { 867 $_cnt = 1; 868 } 869 870 _req_queue('O~QUD', $_id.$LF . $_cnt.$LF, my($_len), my($_buf)); 871 872 return $_thaw->($_buf)[0] if ($_len > 0 && $_cnt == 1); 873 return @{ $_thaw->($_buf) } if ($_len > 0); 874 return if ($_len == -2); 875 876 MCE::Util::_sock_ready($_Q->{_qr_sock}) if $_is_MSWin32; 877 MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1); 878 879 goto \&dequeue; 880} 881 882sub dequeue_nb { 883 my ($self, $_cnt) = @_; 884 my $_id = $self->[0]; 885 886 return unless ( my $_Q = $_obj->{ $_id } ); 887 return unless ( exists $_Q->{_qr_sock} ); 888 889 if (defined $_cnt && $_cnt ne '1') { 890 _croak('Queue: (dequeue_nb count argument) is not valid') 891 if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1); 892 } 893 else { 894 $_cnt = 1; 895 } 896 897 _req_queue('O~QUN', $_id.$LF . $_cnt.$LF, my($_len), my($_buf)); 898 899 return if ($_len < 0); 900 901 ($_cnt == 1) 902 ? $_thaw->($_buf)[0] 903 : @{ $_thaw->($_buf) }; 904} 905 906sub pending { 907 (@_ == 1 && !wantarray) ? _size('pending', @_) : _auto('pending', @_); 908} 909 9101; 911 912__END__ 913 914############################################################################### 915## ---------------------------------------------------------------------------- 916## Module usage. 917## 918############################################################################### 919 920=head1 NAME 921 922MCE::Shared::Queue - Hybrid-queue helper class 923 924=head1 VERSION 925 926This document describes MCE::Shared::Queue version 1.874 927 928=head1 DESCRIPTION 929 930A queue helper class for use as a standalone or managed by L<MCE::Shared>. 931 932This module is mostly compatible with L<MCE::Queue> except for the C<gather> 933option which is not supported in this context. It provides a queue interface 934supporting normal and priority queues. Data from shared queues reside under 935the shared-manager process, otherwise locally. 936 937=head1 SYNOPSIS 938 939 # non-shared or local construction for use by a single process 940 941 use MCE::Shared::Queue; 942 943 my $qu = MCE::Shared::Queue->new( await => 1, queue => [ "." ] ); 944 945 # construction for sharing with other threads and processes 946 947 use MCE::Shared; 948 use MCE::Shared::Queue; 949 950 my $qu = MCE::Shared->queue( 951 porder => $MCE::Shared::Queue::HIGHEST, 952 type => $MCE::Shared::Queue::FIFO, 953 ); 954 955 # possible values for "porder" and "type" 956 957 porder => 958 $MCE::Shared::Queue::HIGHEST # Highest priority items dequeue first 959 $MCE::Shared::Queue::LOWEST # Lowest priority items dequeue first 960 961 type => 962 $MCE::Shared::Queue::FIFO # First in, first out 963 $MCE::Shared::Queue::LIFO # Last in, first out 964 $MCE::Shared::Queue::LILO # Synonym for FIFO 965 $MCE::Shared::Queue::FILO # Synonym for LIFO 966 967 # below, [ ... ] denotes optional parameters 968 969 $qu->await( [ $pending_threshold ] ); 970 $qu->clear(); 971 $qu->end(); 972 973 $qu->enqueue( $item [, $item, ... ] ); 974 $qu->enqueuep( $priority, $item [, $item, ... ] ); 975 976 $item = $qu->dequeue(); 977 @items = $qu->dequeue( $count ); 978 $item = $qu->dequeue_nb(); 979 @items = $qu->dequeue_nb( $count ); 980 981 $qu->insert( $index, $item [, $item, ... ] ); 982 $qu->insertp( $priority, $index, $item [, $item, ... ] ); 983 984 $count = $qu->pending(); 985 $item = $qu->peek( [ $index ] ); 986 $item = $qu->peekp( $priority [, $index ] ); 987 @array = $qu->heap(); 988 989=head1 API DOCUMENTATION 990 991=head2 MCE::Shared::Queue->new ( [ options ] ) 992 993=head2 MCE::Shared->queue ( [ options ] ) 994 995Constructs a new object. Supported options are queue, porder, type, and await. 996Note: The barrier and fast options are silentently ignored (no-op) if specified; 997starting with 1.867. 998 999 # non-shared or local construction for use by a single process 1000 1001 use MCE::Shared::Queue; 1002 1003 $q1 = MCE::Shared::Queue->new(); 1004 $q2 = MCE::Shared::Queue->new( queue => [ 0, 1, 2 ] ); 1005 1006 $q3 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::HIGHEST ); 1007 $q4 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::LOWEST ); 1008 1009 $q5 = MCE::Shared::Queue->new( type => $MCE::Shared::Queue::FIFO ); 1010 $q6 = MCE::Shared::Queue->new( type => $MCE::Shared::Queue::LIFO ); 1011 1012 $q7 = MCE::Shared::Queue->new( await => 1, barrier => 0 ); 1013 $q8 = MCE::Shared::Queue->new( fast => 1 ); 1014 1015 # construction for sharing with other threads and processes 1016 1017 use MCE::Shared; 1018 use MCE::Shared::Queue; 1019 1020 $q1 = MCE::Shared->queue(); 1021 $q2 = MCE::Shared->queue( queue => [ 0, 1, 2 ] ); 1022 1023 $q3 = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST ); 1024 $q4 = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST ); 1025 1026 $q5 = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO ); 1027 $q6 = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO ); 1028 1029 $q7 = MCE::Shared->queue( await => 1, barrier => 0 ); 1030 $q8 = MCE::Shared->queue( fast => 1 ); 1031 1032The C<await> option, when enabled, allows workers to block (semaphore-like) 1033until the number of items pending is equal or less than a threshold value. 1034The C<await> method is described below. 1035 1036Obsolete: On Unix platforms, C<barrier> mode (enabled by default) prevents 1037many workers from dequeuing simultaneously to lessen overhead for the OS kernel. 1038Specify 0 to disable barrier mode and not allocate sockets. The barrier option 1039has no effect if constructing the queue inside a thread or enabling C<fast>. 1040 1041Obsolete: The C<fast> option speeds up dequeues and is not enabled by default. 1042It is beneficial for queues not calling (->dequeue_nb) and not altering the 1043count value while running; e.g. ->dequeue($count). 1044 1045=head2 await ( pending_threshold ) 1046 1047Waits until the queue drops down to threshold items. The C<await> method is 1048beneficial when wanting to throttle worker(s) appending to the queue. Perhaps, 1049consumers are running a bit behind and wanting prevent memory consumption from 1050increasing too high. Below, the number of items pending will never go above 20. 1051 1052 use Time::HiRes qw( sleep ); 1053 1054 use MCE::Flow; 1055 use MCE::Shared; 1056 1057 my $q = MCE::Shared->queue( await => 1, fast => 1 ); 1058 my ( $producers, $consumers ) = ( 1, 8 ); 1059 1060 mce_flow { 1061 task_name => [ 'producer', 'consumer' ], 1062 max_workers => [ $producers, $consumers ], 1063 }, 1064 sub { 1065 ## producer 1066 for my $item ( 1 .. 100 ) { 1067 $q->enqueue($item); 1068 1069 ## blocks until the # of items pending reaches <= 10 1070 if ($item % 10 == 0) { 1071 MCE->say( 'pending: '.$q->pending() ); 1072 $q->await(10); 1073 } 1074 } 1075 1076 ## notify consumers no more work 1077 $q->end(); 1078 1079 }, 1080 sub { 1081 ## consumers 1082 while (defined (my $next = $q->dequeue())) { 1083 MCE->say( MCE->task_wid().': '.$next ); 1084 sleep 0.100; 1085 } 1086 }; 1087 1088=head2 clear ( ) 1089 1090Clears the queue of any items. 1091 1092 $q->clear; 1093 1094=head2 end ( ) 1095 1096Stops the queue from receiving more items. Any worker blocking on C<dequeue> 1097will be unblocked automatically. Subsequent calls to C<dequeue> will behave 1098like C<dequeue_nb>. Current API available since MCE::Shared 1.814. 1099 1100 $q->end(); 1101 1102MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one might 1103want to enqueue C<undef>'s versus calling C<end>. The number of C<undef>'s 1104depends on how many items workers dequeue at a time. 1105 1106 $q->enqueue((undef) x ($N_workers * 1)); # $q->dequeue() 1 item 1107 $q->enqueue((undef) x ($N_workers * 2)); # $q->dequeue(2) 2 items 1108 $q->enqueue((undef) x ($N_workers * N)); # $q->dequeue(N) N items 1109 1110=head2 enqueue ( item [, item, ... ] ) 1111 1112Appends a list of items onto the end of the normal queue. 1113 1114 $q->enqueue( 'foo' ); 1115 $q->enqueue( 'bar', 'baz' ); 1116 1117=head2 enqueuep ( priority, item [, item, ... ] ) 1118 1119Appends a list of items onto the end of the priority queue with priority. 1120 1121 $q->enqueue( $priority, 'foo' ); 1122 $q->enqueue( $priority, 'bar', 'baz' ); 1123 1124=head2 dequeue ( [ count ] ) 1125 1126Returns the requested number of items (default 1) from the queue. Priority 1127data will always dequeue first before any data from the normal queue. 1128 1129 $q->dequeue( 2 ); 1130 $q->dequeue; # default 1 1131 1132The method will block if the queue contains zero items. If the queue contains 1133fewer than the requested number of items, the method will not block, but 1134return whatever items there are on the queue. 1135 1136The $count, used for requesting the number of items, is beneficial when workers 1137are passing parameters through the queue. For this reason, always remember to 1138dequeue using the same multiple for the count. This is unlike Thread::Queue 1139which will block until the requested number of items are available. 1140 1141 # MCE::Shared::Queue 1.816 and prior releases 1142 while ( my @items = $q->dequeue(2) ) { 1143 last unless ( defined $items[0] ); 1144 ... 1145 } 1146 1147 # MCE::Shared::Queue 1.817 and later 1148 while ( my @items = $q->dequeue(2) ) { 1149 ... 1150 } 1151 1152=head2 dequeue_nb ( [ count ] ) 1153 1154Returns the requested number of items (default 1) from the queue. Like with 1155dequeue, priority data will always dequeue first. This method is non-blocking 1156and returns C<undef> in the absence of data. 1157 1158 $q->dequeue_nb( 2 ); 1159 $q->dequeue_nb; # default 1 1160 1161=head2 insert ( index, item [, item, ... ] ) 1162 1163Adds the list of items to the queue at the specified index position (0 is the 1164head of the list). The head of the queue is that item which would be removed 1165by a call to dequeue. 1166 1167 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO ); 1168 $q->enqueue(1, 2, 3, 4); 1169 $q->insert(1, 'foo', 'bar'); 1170 # Queue now contains: 1, foo, bar, 2, 3, 4 1171 1172 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO ); 1173 $q->enqueue(1, 2, 3, 4); 1174 $q->insert(1, 'foo', 'bar'); 1175 # Queue now contains: 1, 2, 3, 'foo', 'bar', 4 1176 1177=head2 insertp ( priority, index, item [, item, ... ] ) 1178 1179Adds the list of items to the queue at the specified index position with 1180priority. The behavior is similarly to C<< $q->insert >> otherwise. 1181 1182=head2 pending ( ) 1183 1184Returns the number of items in the queue. The count includes both normal 1185and priority data. Returns C<undef> if the queue has been ended, and there 1186are no more items in the queue. 1187 1188 $q = MCE::Shared->queue(); 1189 $q->enqueuep(5, 'foo', 'bar'); 1190 $q->enqueue('sunny', 'day'); 1191 1192 print $q->pending(), "\n"; 1193 # Output: 4 1194 1195=head2 peek ( [ index ] ) 1196 1197Returns an item from the normal queue, at the specified index, without 1198dequeuing anything. It defaults to the head of the queue if index is not 1199specified. The head of the queue is that item which would be removed by a 1200call to dequeue. Negative index values are supported, similarly to arrays. 1201 1202 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO ); 1203 $q->enqueue(1, 2, 3, 4, 5); 1204 1205 print $q->peek(1), ' ', $q->peek(-2), "\n"; 1206 # Output: 2 4 1207 1208 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO ); 1209 $q->enqueue(1, 2, 3, 4, 5); 1210 1211 print $q->peek(1), ' ', $q->peek(-2), "\n"; 1212 # Output: 4 2 1213 1214=head2 peekp ( priority [, index ] ) 1215 1216Returns an item from the queue with priority, at the specified index, without 1217dequeuing anything. It defaults to the head of the queue if index is not 1218specified. The behavior is similarly to C<< $q->peek >> otherwise. 1219 1220=head2 peekh ( [ index ] ) 1221 1222Returns an item from the head of the heap or at the specified index. 1223 1224 $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST ); 1225 $q->enqueuep(5, 'foo'); 1226 $q->enqueuep(6, 'bar'); 1227 $q->enqueuep(4, 'sun'); 1228 1229 print $q->peekh(0), "\n"; 1230 # Output: 6 1231 1232 $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST ); 1233 $q->enqueuep(5, 'foo'); 1234 $q->enqueuep(6, 'bar'); 1235 $q->enqueuep(4, 'sun'); 1236 1237 print $q->peekh(0), "\n"; 1238 # Output: 4 1239 1240=head2 heap ( ) 1241 1242Returns an array containing the heap data. Heap data consists of priority 1243numbers, not the data. 1244 1245 @h = $q->heap; # $MCE::Shared::Queue::HIGHEST 1246 # Heap contains: 6, 5, 4 1247 1248 @h = $q->heap; # $MCE::Shared::Queue::LOWEST 1249 # Heap contains: 4, 5, 6 1250 1251=head1 ACKNOWLEDGMENTS 1252 1253=over 3 1254 1255=item * L<List::BinarySearch> 1256 1257The bsearch_num_pos method was helpful for accommodating the highest and lowest 1258order in MCE::Shared::Queue. 1259 1260=item * L<POE::Queue::Array> 1261 1262For extra optimization, two if statements were adopted for checking if the item 1263belongs at the end or head of the queue. 1264 1265=item * L<List::Priority> 1266 1267MCE::Shared::Queue supports both normal and priority queues. 1268 1269=item * L<Thread::Queue> 1270 1271Thread::Queue is used as a template for identifying and documenting the methods. 1272MCE::Shared::Queue is not fully compatible due to supporting normal and priority 1273queues simultaneously; e.g. 1274 1275 $q->enqueue( $item [, $item, ... ] ); # normal queue 1276 $q->enqueuep( $p, $item [, $item, ... ] ); # priority queue 1277 1278 $q->dequeue( [ $count ] ); # priority data dequeues first 1279 $q->dequeue_nb( [ $count ] ); 1280 1281 $q->pending(); # counts both normal/priority queues 1282 1283=back 1284 1285=head1 LIMITATIONS 1286 1287Perl must have L<IO::FDPass> for constructing a shared C<condvar> or C<queue> 1288while the shared-manager process is running. For platforms where L<IO::FDPass> 1289isn't possible, construct C<condvar> and C<queue> before other classes. 1290On systems without C<IO::FDPass>, the manager process is delayed until sharing 1291other classes or started explicitly. 1292 1293 use MCE::Shared; 1294 1295 my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0; 1296 1297 my $cv = MCE::Shared->condvar(); 1298 my $que = MCE::Shared->queue(); 1299 1300 MCE::Shared->start() unless $has_IO_FDPass; 1301 1302Regarding mce_open, C<IO::FDPass> is needed for constructing a shared-handle 1303from a non-shared handle not yet available inside the shared-manager process. 1304The workaround is to have the non-shared handle made before the shared-manager 1305is started. Passing a file by reference is fine for the three STD* handles. 1306 1307 # The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR. 1308 1309 mce_open my $shared_in, "<", \*STDIN; # ok 1310 mce_open my $shared_out, ">>", \*STDOUT; # ok 1311 mce_open my $shared_err, ">>", \*STDERR; # ok 1312 mce_open my $shared_fh1, "<", "/path/to/sequence.fasta"; # ok 1313 mce_open my $shared_fh2, ">>", "/path/to/results.log"; # ok 1314 1315 mce_open my $shared_fh, ">>", \*NON_SHARED_FH; # requires IO::FDPass 1316 1317The L<IO::FDPass> module is known to work reliably on most platforms. 1318Install 1.1 or later to rid of limitations described above. 1319 1320 perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'" 1321 1322=head1 INDEX 1323 1324L<MCE|MCE>, L<MCE::Hobo>, L<MCE::Shared> 1325 1326=head1 AUTHOR 1327 1328Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> 1329 1330=cut 1331 1332