1###############################################################################
2## ----------------------------------------------------------------------------
3## Hybrid-queue helper class.
4##
5###############################################################################
6
7package MCE::Shared::Queue;
8
9use strict;
10use warnings;
11
12use 5.010001;
13
14no warnings qw( threads recursion uninitialized numeric );
15
16our $VERSION = '1.874';
17
18## no critic (Subroutines::ProhibitExplicitReturnUndef)
19
20use Scalar::Util qw( looks_like_number );
21use MCE::Shared::Base ();
22use MCE::Util ();
23use MCE::Mutex ();
24
25use overload (
26   q("")    => \&MCE::Shared::Base::_stringify,
27   q(0+)    => \&MCE::Shared::Base::_numify,
28   fallback => 1
29);
30
31###############################################################################
32## ----------------------------------------------------------------------------
33## Attributes used internally.
34## _qr_sock _qw_sock _datp _datq _dsem _heap _init_pid _porder _type
35## _ar_sock _aw_sock _asem _tsem
36##
37###############################################################################
38
39our ($HIGHEST, $LOWEST, $FIFO, $LIFO, $LILO, $FILO) = (1, 0, 1, 0, 1, 0);
40my  ($PORDER, $TYPE, $AWAIT) = ($HIGHEST, $FIFO, 0);
41
42my $LF = "\012"; Internals::SvREADONLY($LF, 1);
43my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
44my $_reset_flg = 1;
45
46my %_valid_fields_new = map { $_ => 1 } qw(
47   await barrier fast porder queue type
48);
49
50sub _croak {
51   goto &MCE::Shared::Base::_croak;
52}
53sub CLONE {
54   $_tid = threads->tid() if $INC{'threads.pm'};
55}
56
57sub DESTROY {
58   my ($_Q) = @_;
59   my $_pid = $_tid ? $$ .'.'. $_tid : $$;
60
61   undef $_Q->{_datp}, undef $_Q->{_datq}, undef $_Q->{_heap};
62
63   if ($_Q->{_init_pid} eq $_pid) {
64      MCE::Util::_destroy_socks($_Q, qw(_aw_sock _ar_sock _qw_sock _qr_sock));
65   }
66
67   return;
68}
69
70###############################################################################
71## ----------------------------------------------------------------------------
72## Instance instantiation.
73##
74###############################################################################
75
76# new ( options )
77
78sub new {
79   my ($_class, %_argv) = @_;
80   my $_Q = {}; bless($_Q, ref($_class) || $_class);
81
82   for my $_p (keys %_argv) {
83      _croak("Queue: ($_p) is not a valid constructor argument")
84         unless (exists $_valid_fields_new{$_p});
85   }
86
87   $_Q->{_asem} =  0;  # Semaphore count variable for the ->await method
88   $_Q->{_datp} = {};  # Priority data { p1 => [ ], p2 => [ ], pN => [ ] }
89   $_Q->{_heap} = [];  # Priority heap [ pN, p2, p1 ] in heap order
90                       # fyi, _datp will always dequeue before _datq
91
92   # --------------------------------------------------------------------------
93
94   $_Q->{_await}  = defined $_argv{await}  ? $_argv{await}  : $AWAIT;
95   $_Q->{_porder} = defined $_argv{porder} ? $_argv{porder} : $PORDER;
96   $_Q->{_type}   = defined $_argv{type}   ? $_argv{type}   : $TYPE;
97
98   if (exists $_argv{queue}) {
99      _croak('Queue: (queue) is not an ARRAY reference')
100         if (ref $_argv{queue} ne 'ARRAY');
101      $_Q->{_datq} = $_argv{queue};
102   }
103   else {
104      $_Q->{_datq} = [];
105   }
106
107   # --------------------------------------------------------------------------
108
109   $_Q->{_init_pid} = $_tid ? $$ .'.'. $_tid : $$;
110   $_Q->{_dsem}     = 0;
111
112   MCE::Util::_sock_pair($_Q, qw(_qr_sock _qw_sock), undef, 1);
113   MCE::Util::_sock_pair($_Q, qw(_ar_sock _aw_sock), undef, 1) if $_Q->{_await};
114
115   MCE::Shared::Object::_reset(), $_reset_flg = ''
116      if ($_reset_flg && $INC{'MCE/Shared/Server.pm'});
117
118   return $_Q;
119}
120
121###############################################################################
122## ----------------------------------------------------------------------------
123## Public methods.
124##
125###############################################################################
126
127# await ( pending_threshold )
128
129sub await {
130   # Handled by MCE::Shared::Object when shared.
131   return;
132}
133
134# clear ( )
135
136sub clear {
137   my ($_Q) = @_;
138
139   %{ $_Q->{_datp} } = ();
140   @{ $_Q->{_datq} } = ();
141   @{ $_Q->{_heap} } = ();
142
143   return;
144}
145
146# end ( )
147
148sub end {
149   my ($_Q) = @_;
150
151   if (!exists $_Q->{_ended}) {
152      for my $_i (1 .. $_Q->{_dsem}) { syswrite($_Q->{_qw_sock}, $LF) }
153      $_Q->{_dsem} = 0, $_Q->{_ended} = undef;
154   }
155
156   return;
157}
158
159# enqueue ( item [, item, ... ] )
160
161sub enqueue {
162   my $_Q = shift;
163
164   return unless (scalar @_);
165
166   if (exists $_Q->{_ended}) {
167      warn "Queue: (enqueue) called on queue that has been 'end'ed\n";
168      return;
169   }
170
171   if ($_Q->{_dsem}) {
172      for my $_i (1 .. scalar @_) {
173         $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
174         last unless $_Q->{_dsem};
175      }
176   }
177
178   push @{ $_Q->{_datq} }, @_;
179
180   return;
181}
182
183# enqueuep ( priority, item [, item, ... ] )
184
185sub enqueuep {
186   my ($_Q, $_p) = (shift, shift);
187
188   _croak('Queue: (enqueuep priority) is not an integer')
189      if (!looks_like_number($_p) || int($_p) != $_p);
190
191   return unless (scalar @_);
192
193   if (exists $_Q->{_ended}) {
194      warn "Queue: (enqueuep) called on queue that has been 'end'ed\n";
195      return;
196   }
197
198   if ($_Q->{_dsem}) {
199      for my $_i (1 .. scalar @_) {
200         $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
201         last unless $_Q->{_dsem};
202      }
203   }
204
205   $_Q->_enqueuep($_p, @_);
206
207   return;
208}
209
210# dequeue ( count )
211# dequeue ( )
212
213sub dequeue {
214   my ($_Q, $_cnt) = @_;
215   my (@_items, $_has_data, $_buf);
216
217   if (defined $_cnt && $_cnt ne '1') {
218      _croak('Queue: (dequeue count argument) is not valid')
219         if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
220
221      my $_pending = @{ $_Q->{_datq} };
222
223      if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
224         for my $_h (@{ $_Q->{_heap} }) {
225            $_pending += @{ $_Q->{_datp}->{$_h} };
226         }
227      }
228      $_cnt = $_pending if $_pending < $_cnt;
229
230      for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
231   }
232   else {
233      $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
234      $_buf      = $_Q->_dequeue();
235   }
236
237   return @_items if (scalar  @_items);
238   return $_buf   if ($_has_data);
239   return ()      if (exists  $_Q->{_ended});
240
241   $_Q->{_dsem} += 1, MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
242
243   goto \&dequeue;
244}
245
246# dequeue_nb ( count )
247# dequeue_nb ( )
248
249sub dequeue_nb {
250   my ($_Q, $_cnt) = @_;
251
252   if (defined $_cnt && $_cnt ne '1') {
253      _croak('Queue: (dequeue count argument) is not valid')
254         if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
255
256      my $_pending = @{ $_Q->{_datq} };
257
258      if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
259         for my $_h (@{ $_Q->{_heap} }) {
260            $_pending += @{ $_Q->{_datp}->{$_h} };
261         }
262      }
263
264      $_cnt = $_pending if $_pending < $_cnt;
265
266      return map { $_Q->_dequeue() } 1 .. $_cnt;
267   }
268
269   my $_buf = $_Q->_dequeue();
270
271   return defined($_buf) ? $_buf : ();
272}
273
274# pending ( )
275
276sub pending {
277   my ($_Q) = @_;
278   my $_pending = @{ $_Q->{_datq} };
279
280   if (scalar @{ $_Q->{_heap} }) {
281      for my $_h (@{ $_Q->{_heap} }) {
282         $_pending += @{ $_Q->{_datp}->{$_h} };
283      }
284   }
285
286   return (exists $_Q->{_ended})
287      ? $_pending ? $_pending : undef
288      : $_pending;
289}
290
291# insert ( index, item [, item, ... ] )
292
293sub insert {
294   my ($_Q, $_i) = (shift, shift);
295
296   _croak('Queue: (insert index) is not an integer')
297      if (!looks_like_number($_i) || int($_i) != $_i);
298
299   return unless (scalar @_);
300
301   if (exists $_Q->{_ended}) {
302      warn "Queue: (insert) called on queue that has been 'end'ed\n";
303      return;
304   }
305
306   if ($_Q->{_dsem}) {
307      for my $_i (1 .. scalar @_) {
308         $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
309         last unless $_Q->{_dsem};
310      }
311   }
312
313   if (abs($_i) > scalar @{ $_Q->{_datq} }) {
314      if ($_i >= 0) {
315         if ($_Q->{_type}) {
316            push @{ $_Q->{_datq} }, @_;
317         } else {
318            unshift @{ $_Q->{_datq} }, @_;
319         }
320      }
321      else {
322         if ($_Q->{_type}) {
323            unshift @{ $_Q->{_datq} }, @_;
324         } else {
325            push @{ $_Q->{_datq} }, @_;
326         }
327      }
328   }
329   else {
330      if (!$_Q->{_type}) {
331         $_i = ($_i >= 0)
332            ? scalar(@{ $_Q->{_datq} }) - $_i
333            : abs($_i);
334      }
335      splice @{ $_Q->{_datq} }, $_i, 0, @_;
336   }
337
338   return;
339}
340
341# insertp ( priority, index, item [, item, ... ] )
342
343sub insertp {
344   my ($_Q, $_p, $_i) = (shift, shift, shift);
345
346   _croak('Queue: (insertp priority) is not an integer')
347      if (!looks_like_number($_p) || int($_p) != $_p);
348   _croak('Queue: (insertp index) is not an integer')
349      if (!looks_like_number($_i) || int($_i) != $_i);
350
351   return unless (scalar @_);
352
353   if (exists $_Q->{_ended}) {
354      warn "Queue: (insertp) called on queue that has been 'end'ed\n";
355      return;
356   }
357
358   if ($_Q->{_dsem}) {
359      for my $_i (1 .. scalar @_) {
360         $_Q->{_dsem} -= 1, syswrite($_Q->{_qw_sock}, $LF);
361         last unless $_Q->{_dsem};
362      }
363   }
364
365   if (exists $_Q->{_datp}->{$_p} && scalar @{ $_Q->{_datp}->{$_p} }) {
366
367      if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} }) {
368         if ($_i >= 0) {
369            if ($_Q->{_type}) {
370               push @{ $_Q->{_datp}->{$_p} }, @_;
371            } else {
372               unshift @{ $_Q->{_datp}->{$_p} }, @_;
373            }
374         }
375         else {
376            if ($_Q->{_type}) {
377               unshift @{ $_Q->{_datp}->{$_p} }, @_;
378            } else {
379               push @{ $_Q->{_datp}->{$_p} }, @_;
380            }
381         }
382      }
383      else {
384         if (!$_Q->{_type}) {
385            $_i = ($_i >=0)
386               ? scalar(@{ $_Q->{_datp}->{$_p} }) - $_i
387               : abs($_i);
388         }
389         splice @{ $_Q->{_datp}->{$_p} }, $_i, 0, @_;
390      }
391   }
392   else {
393      $_Q->_enqueuep($_p, @_);
394   }
395
396   return;
397}
398
399# peek ( index )
400# peek ( )
401
402sub peek {
403   my ($_Q, $_i) = @_;
404
405   if ($_i) {
406      _croak('Queue: (peek index) is not an integer')
407         if (!looks_like_number($_i) || int($_i) != $_i);
408   }
409   else { $_i = 0 }
410
411   return undef if (abs($_i) > scalar @{ $_Q->{_datq} });
412
413   if (!$_Q->{_type}) {
414      $_i = ($_i >= 0)
415         ? scalar(@{ $_Q->{_datq} }) - ($_i + 1)
416         : abs($_i + 1);
417   }
418
419   return $_Q->{_datq}->[$_i];
420}
421
422# peekp ( priority, index )
423# peekp ( priority )
424
425sub peekp {
426   my ($_Q, $_p, $_i) = @_;
427
428   if ($_i) {
429      _croak('Queue: (peekp index) is not an integer')
430         if (!looks_like_number($_i) || int($_i) != $_i);
431   }
432   else { $_i = 0 }
433
434   _croak('Queue: (peekp priority) is not an integer')
435      if (!looks_like_number($_p) || int($_p) != $_p);
436
437   return undef unless (exists $_Q->{_datp}->{$_p});
438   return undef if (abs($_i) > scalar @{ $_Q->{_datp}->{$_p} });
439
440   if (!$_Q->{_type}) {
441      $_i = ($_i >= 0)
442         ? scalar(@{ $_Q->{_datp}->{$_p} }) - ($_i + 1)
443         : abs($_i + 1);
444   }
445
446   return $_Q->{_datp}->{$_p}->[$_i];
447}
448
449# peekh ( index )
450# peekh ( )
451
452sub peekh {
453   my ($_Q, $_i) = @_;
454
455   if ($_i) {
456      _croak('Queue: (peekh index) is not an integer')
457         if (!looks_like_number($_i) || int($_i) != $_i);
458   }
459   else { $_i = 0 }
460
461   return undef if (abs($_i) > scalar @{ $_Q->{_heap} });
462   return $_Q->{_heap}->[$_i];
463}
464
465# heap ( )
466
467sub heap {
468   return @{ shift->{_heap} };
469}
470
471###############################################################################
472## ----------------------------------------------------------------------------
473## Private methods.
474##
475###############################################################################
476
477# Add items to the tail of the queue with priority level.
478
479sub _enqueuep {
480   my ($_Q, $_p) = (shift, shift);
481
482   # Enlist priority into the heap.
483   if (!exists $_Q->{_datp}->{$_p} || @{ $_Q->{_datp}->{$_p} } == 0) {
484
485      unless (scalar @{ $_Q->{_heap} }) {
486         push @{ $_Q->{_heap} }, $_p;
487      }
488      elsif ($_Q->{_porder}) {
489         $_Q->_heap_insert_high($_p);
490      }
491      else {
492         $_Q->_heap_insert_low($_p);
493      }
494   }
495
496   # Append item(s) into the queue.
497   push @{ $_Q->{_datp}->{$_p} }, @_;
498
499   return;
500}
501
502# Return one item from the queue.
503
504sub _dequeue {
505   my ($_Q) = @_;
506
507   # Return item from the non-priority queue.
508   unless (scalar @{ $_Q->{_heap} }) {
509      return ($_Q->{_type})
510         ? shift @{ $_Q->{_datq} } : pop @{ $_Q->{_datq} };
511   }
512
513   my $_p = $_Q->{_heap}->[0];
514
515   # Delist priority from the heap when 1 item remains.
516   shift @{ $_Q->{_heap} } if (@{ $_Q->{_datp}->{$_p} } == 1);
517
518   # Return item from the priority queue.
519   return ($_Q->{_type})
520      ? shift @{ $_Q->{_datp}->{$_p} } : pop @{ $_Q->{_datp}->{$_p} };
521}
522
523# Helper method for getting the reference to the underlying array.
524# Use with test scripts for comparing data only (not a public API).
525
526sub _get_aref {
527   my ($_Q, $_p) = @_;
528
529   if (defined $_p) {
530      _croak('Queue: (get_aref priority) is not an integer')
531         if (!looks_like_number($_p) || int($_p) != $_p);
532
533      return undef unless (exists $_Q->{_datp}->{$_p});
534      return $_Q->{_datp}->{$_p};
535   }
536
537   return $_Q->{_datq};
538}
539
540# Insert priority into the heap. A lower priority level comes first.
541
542sub _heap_insert_low {
543   my ($_Q, $_p) = @_;
544
545   # Insert priority at the head of the heap.
546   if ($_p < $_Q->{_heap}->[0]) {
547      unshift @{ $_Q->{_heap} }, $_p;
548   }
549
550   # Insert priority at the end of the heap.
551   elsif ($_p > $_Q->{_heap}->[-1]) {
552      push @{ $_Q->{_heap} }, $_p;
553   }
554
555   # Insert priority through binary search.
556   else {
557      my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
558
559      while ($_lower < $_upper) {
560         my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
561         if ($_p > $_Q->{_heap}->[$_midpoint]) {
562            $_lower = $_midpoint + 1;
563         } else {
564            $_upper = $_midpoint;
565         }
566      }
567
568      # Insert priority into the heap.
569      splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
570   }
571
572   return;
573}
574
575# Insert priority into the heap. A higher priority level comes first.
576
577sub _heap_insert_high {
578   my ($_Q, $_p) = @_;
579
580   # Insert priority at the head of the heap.
581   if ($_p > $_Q->{_heap}->[0]) {
582      unshift @{ $_Q->{_heap} }, $_p;
583   }
584
585   # Insert priority at the end of the heap.
586   elsif ($_p < $_Q->{_heap}->[-1]) {
587      push @{ $_Q->{_heap} }, $_p;
588   }
589
590   # Insert priority through binary search.
591   else {
592      my $_lower = 0; my $_upper = @{ $_Q->{_heap} };
593
594      while ($_lower < $_upper) {
595         my $_midpoint = $_lower + (($_upper - $_lower) >> 1);
596         if ($_p < $_Q->{_heap}->[$_midpoint]) {
597            $_lower = $_midpoint + 1;
598         } else {
599            $_upper = $_midpoint;
600         }
601      }
602
603      # Insert priority into the heap.
604      splice @{ $_Q->{_heap} }, $_lower, 0, $_p;
605   }
606
607   return;
608}
609
610###############################################################################
611## ----------------------------------------------------------------------------
612## Server functions.
613##
614###############################################################################
615
616{
617   use bytes;
618
619   use constant {
620      SHR_O_QUA => 'O~QUA',  # Queue await
621      SHR_O_QUD => 'O~QUD',  # Queue dequeue
622      SHR_O_QUN => 'O~QUN',  # Queue dequeue non-blocking
623   };
624
625   my (
626      $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_obj, $_freeze, $_thaw,
627      $_cnt, $_id, $_has_data, $_pending, $_t
628   );
629
630   my %_output_function = (
631
632      SHR_O_QUA.$LF => sub {                      # Queue await
633         $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
634
635         chomp($_id = <$_DAU_R_SOCK>),
636         chomp($_t  = <$_DAU_R_SOCK>);
637
638         my $_Q = $_obj->{ $_id } || do {
639            print {$_DAU_R_SOCK} $LF;
640         };
641         $_Q->{_tsem} = $_t;
642
643         if ($_Q->pending() <= $_t) {
644            syswrite($_Q->{_aw_sock}, $LF);
645         } else {
646            $_Q->{_asem} += 1;
647         }
648
649         print {$_DAU_R_SOCK} $LF;
650
651         return;
652      },
653
654      SHR_O_QUD.$LF => sub {                      # Queue dequeue
655         $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
656
657         chomp($_id  = <$_DAU_R_SOCK>),
658         chomp($_cnt = <$_DAU_R_SOCK>);
659
660         $_cnt = 0 if ($_cnt == 1);
661
662         my $_Q = $_obj->{ $_id } || do {
663            print {$_DAU_R_SOCK} '-1'.$LF;
664            return;
665         };
666
667         my (@_items, $_buf);
668
669         if ($_cnt) {
670            $_pending = @{ $_Q->{_datq} };
671
672            if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
673               for my $_h (@{ $_Q->{_heap} }) {
674                  $_pending += @{ $_Q->{_datp}->{$_h} };
675               }
676            }
677            $_cnt = $_pending if $_pending < $_cnt;
678
679            for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
680         }
681         else {
682            $_has_data = ( @{ $_Q->{_datq} } || @{ $_Q->{_heap} } ) ? 1 : 0;
683            $_buf      = $_Q->_dequeue();
684         }
685
686         if ($_cnt) {
687            $_buf = $_freeze->(\@_items);
688            print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
689         }
690         elsif ($_has_data) {
691            $_buf = $_freeze->([ $_buf ]);
692            print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
693         }
694         elsif (exists $_Q->{_ended}) {
695            print {$_DAU_R_SOCK} '-2'.$LF;
696         }
697         else {
698            print {$_DAU_R_SOCK} '-1'.$LF;
699            $_Q->{_dsem} += 1;
700         }
701
702         if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
703            for my $_i (1 .. $_Q->{_asem}) {
704               syswrite($_Q->{_aw_sock}, $LF);
705            }
706            $_Q->{_asem} = 0;
707         }
708
709         return;
710      },
711
712      SHR_O_QUN.$LF => sub {                      # Queue dequeue non-blocking
713         $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
714
715         chomp($_id  = <$_DAU_R_SOCK>),
716         chomp($_cnt = <$_DAU_R_SOCK>);
717
718         my $_Q = $_obj->{ $_id } || do {
719            print {$_DAU_R_SOCK} '-1'.$LF;
720            return;
721         };
722
723         if ($_cnt == 1) {
724            my $_buf = $_Q->_dequeue();
725
726            if (defined $_buf) {
727               $_buf = $_freeze->([ $_buf ]);
728               print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
729            }
730            else {
731               print {$_DAU_R_SOCK} '-1'.$LF;
732            }
733         }
734         else {
735            my @_items;
736            my $_pending = @{ $_Q->{_datq} };
737
738            if ($_pending < $_cnt && scalar @{ $_Q->{_heap} }) {
739               for my $_h (@{ $_Q->{_heap} }) {
740                  $_pending += @{ $_Q->{_datp}->{$_h} };
741               }
742            }
743            $_cnt = $_pending if $_pending < $_cnt;
744
745            for my $_i (1 .. $_cnt) { push @_items, $_Q->_dequeue() }
746
747            if ($_cnt) {
748               my $_buf = $_freeze->(\@_items);
749               print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
750            }
751            else {
752               print {$_DAU_R_SOCK} '-1'.$LF;
753            }
754         }
755
756         if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) {
757            for my $_i (1 .. $_Q->{_asem}) {
758               syswrite($_Q->{_aw_sock}, $LF);
759            }
760            $_Q->{_asem} = 0;
761         }
762
763         return;
764      },
765
766   );
767
768   sub _init_mgr {
769      my $_function;
770      ( $_DAU_R_SOCK_REF, $_obj, $_function, $_freeze, $_thaw ) = @_;
771
772      for my $key ( keys %_output_function ) {
773         last if exists($_function->{$key});
774         $_function->{$key} = $_output_function{$key};
775      }
776
777      return;
778   }
779}
780
781###############################################################################
782## ----------------------------------------------------------------------------
783## Object package.
784##
785###############################################################################
786
787## Items below are folded into MCE::Shared::Object.
788
789package # hide from rpm
790   MCE::Shared::Object;
791
792use strict;
793use warnings;
794
795no warnings qw( threads recursion uninitialized numeric once );
796
797use bytes;
798
799no overloading;
800
801my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
802
803my ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj,
804    $_freeze, $_thaw);
805
806sub _init_queue {
807   ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj,
808    $_freeze, $_thaw) = @_;
809
810   return;
811}
812
813sub _req_queue {
814   local $\ = undef if (defined $\);
815   local $/ = $LF if ($/ ne $LF);
816   local $MCE::Signal::SIG;
817
818   {
819      local $MCE::Signal::IPC = 1;
820      $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
821
822      print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
823      print({$_DAU_W_SOCK} $_[1]);
824      chomp($_[2] = <$_DAU_W_SOCK>);
825
826      read($_DAU_W_SOCK, $_[3], $_[2]) if ($_[2] > 0);
827
828      $_dat_un->() if !$_is_MSWin32;
829   }
830
831   CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
832}
833
834sub await {
835   my $_id = shift()->[0];
836   return unless ( my $_Q = $_obj->{ $_id } );
837   return unless ( exists $_Q->{_qr_sock} );
838
839   my $_t = shift || 0;
840
841   _croak('Queue: (await) is not enabled for this queue')
842      unless (exists $_Q->{_ar_sock});
843   _croak('Queue: (await threshold) is not an integer')
844      if (!looks_like_number($_t) || int($_t) != $_t);
845
846   $_t = 0 if ($_t < 0);
847   _req1('O~QUA', $_id.$LF . $_t.$LF);
848
849   MCE::Util::_sock_ready($_Q->{_ar_sock}) if $_is_MSWin32;
850   MCE::Util::_sysread($_Q->{_ar_sock}, my($_b), 1);
851
852   return;
853}
854
855sub dequeue {
856   my ($self, $_cnt) = @_;
857   my $_id = $self->[0];
858
859   return unless ( my $_Q = $_obj->{ $_id } );
860   return unless ( exists $_Q->{_qr_sock} );
861
862   if (defined $_cnt && $_cnt ne '1') {
863      _croak('Queue: (dequeue count argument) is not valid')
864         if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
865   }
866   else {
867      $_cnt = 1;
868   }
869
870   _req_queue('O~QUD', $_id.$LF . $_cnt.$LF, my($_len), my($_buf));
871
872   return $_thaw->($_buf)[0]   if ($_len > 0 && $_cnt == 1);
873   return @{ $_thaw->($_buf) } if ($_len > 0);
874   return                      if ($_len == -2);
875
876   MCE::Util::_sock_ready($_Q->{_qr_sock}) if $_is_MSWin32;
877   MCE::Util::_sysread($_Q->{_qr_sock}, my($_next), 1);
878
879   goto \&dequeue;
880}
881
882sub dequeue_nb {
883   my ($self, $_cnt) = @_;
884   my $_id = $self->[0];
885
886   return unless ( my $_Q = $_obj->{ $_id } );
887   return unless ( exists $_Q->{_qr_sock} );
888
889   if (defined $_cnt && $_cnt ne '1') {
890      _croak('Queue: (dequeue_nb count argument) is not valid')
891         if (!looks_like_number($_cnt) || int($_cnt) != $_cnt || $_cnt < 1);
892   }
893   else {
894      $_cnt = 1;
895   }
896
897   _req_queue('O~QUN', $_id.$LF . $_cnt.$LF, my($_len), my($_buf));
898
899   return if ($_len < 0);
900
901   ($_cnt == 1)
902      ? $_thaw->($_buf)[0]
903      : @{ $_thaw->($_buf) };
904}
905
906sub pending {
907   (@_ == 1 && !wantarray) ? _size('pending', @_) : _auto('pending', @_);
908}
909
9101;
911
912__END__
913
914###############################################################################
915## ----------------------------------------------------------------------------
916## Module usage.
917##
918###############################################################################
919
920=head1 NAME
921
922MCE::Shared::Queue - Hybrid-queue helper class
923
924=head1 VERSION
925
926This document describes MCE::Shared::Queue version 1.874
927
928=head1 DESCRIPTION
929
930A queue helper class for use as a standalone or managed by L<MCE::Shared>.
931
932This module is mostly compatible with L<MCE::Queue> except for the C<gather>
933option which is not supported in this context. It provides a queue interface
934supporting normal and priority queues. Data from shared queues reside under
935the shared-manager process, otherwise locally.
936
937=head1 SYNOPSIS
938
939 # non-shared or local construction for use by a single process
940
941 use MCE::Shared::Queue;
942
943 my $qu = MCE::Shared::Queue->new( await => 1, queue => [ "." ] );
944
945 # construction for sharing with other threads and processes
946
947 use MCE::Shared;
948 use MCE::Shared::Queue;
949
950 my $qu = MCE::Shared->queue(
951    porder => $MCE::Shared::Queue::HIGHEST,
952    type   => $MCE::Shared::Queue::FIFO,
953 );
954
955 # possible values for "porder" and "type"
956
957 porder =>
958    $MCE::Shared::Queue::HIGHEST # Highest priority items dequeue first
959    $MCE::Shared::Queue::LOWEST  # Lowest priority items dequeue first
960
961 type =>
962    $MCE::Shared::Queue::FIFO    # First in, first out
963    $MCE::Shared::Queue::LIFO    # Last in, first out
964    $MCE::Shared::Queue::LILO    # Synonym for FIFO
965    $MCE::Shared::Queue::FILO    # Synonym for LIFO
966
967 # below, [ ... ] denotes optional parameters
968
969 $qu->await( [ $pending_threshold ] );
970 $qu->clear();
971 $qu->end();
972
973 $qu->enqueue( $item [, $item, ... ] );
974 $qu->enqueuep( $priority, $item [, $item, ... ] );
975
976 $item  = $qu->dequeue();
977 @items = $qu->dequeue( $count );
978 $item  = $qu->dequeue_nb();
979 @items = $qu->dequeue_nb( $count );
980
981 $qu->insert( $index, $item [, $item, ... ] );
982 $qu->insertp( $priority, $index, $item [, $item, ... ] );
983
984 $count = $qu->pending();
985 $item  = $qu->peek( [ $index ] );
986 $item  = $qu->peekp( $priority [, $index ] );
987 @array = $qu->heap();
988
989=head1 API DOCUMENTATION
990
991=head2 MCE::Shared::Queue->new ( [ options ] )
992
993=head2 MCE::Shared->queue ( [ options ] )
994
995Constructs a new object. Supported options are queue, porder, type, and await.
996Note: The barrier and fast options are silentently ignored (no-op) if specified;
997starting with 1.867.
998
999 # non-shared or local construction for use by a single process
1000
1001 use MCE::Shared::Queue;
1002
1003 $q1 = MCE::Shared::Queue->new();
1004 $q2 = MCE::Shared::Queue->new( queue  => [ 0, 1, 2 ] );
1005
1006 $q3 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::HIGHEST );
1007 $q4 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::LOWEST  );
1008
1009 $q5 = MCE::Shared::Queue->new( type   => $MCE::Shared::Queue::FIFO );
1010 $q6 = MCE::Shared::Queue->new( type   => $MCE::Shared::Queue::LIFO );
1011
1012 $q7 = MCE::Shared::Queue->new( await  => 1, barrier => 0 );
1013 $q8 = MCE::Shared::Queue->new( fast   => 1 );
1014
1015 # construction for sharing with other threads and processes
1016
1017 use MCE::Shared;
1018 use MCE::Shared::Queue;
1019
1020 $q1 = MCE::Shared->queue();
1021 $q2 = MCE::Shared->queue( queue  => [ 0, 1, 2 ] );
1022
1023 $q3 = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
1024 $q4 = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST  );
1025
1026 $q5 = MCE::Shared->queue( type   => $MCE::Shared::Queue::FIFO );
1027 $q6 = MCE::Shared->queue( type   => $MCE::Shared::Queue::LIFO );
1028
1029 $q7 = MCE::Shared->queue( await  => 1, barrier => 0 );
1030 $q8 = MCE::Shared->queue( fast   => 1 );
1031
1032The C<await> option, when enabled, allows workers to block (semaphore-like)
1033until the number of items pending is equal or less than a threshold value.
1034The C<await> method is described below.
1035
1036Obsolete: On Unix platforms, C<barrier> mode (enabled by default) prevents
1037many workers from dequeuing simultaneously to lessen overhead for the OS kernel.
1038Specify 0 to disable barrier mode and not allocate sockets. The barrier option
1039has no effect if constructing the queue inside a thread or enabling C<fast>.
1040
1041Obsolete: The C<fast> option speeds up dequeues and is not enabled by default.
1042It is beneficial for queues not calling (->dequeue_nb) and not altering the
1043count value while running; e.g. ->dequeue($count).
1044
1045=head2 await ( pending_threshold )
1046
1047Waits until the queue drops down to threshold items. The C<await> method is
1048beneficial when wanting to throttle worker(s) appending to the queue. Perhaps,
1049consumers are running a bit behind and wanting prevent memory consumption from
1050increasing too high. Below, the number of items pending will never go above 20.
1051
1052 use Time::HiRes qw( sleep );
1053
1054 use MCE::Flow;
1055 use MCE::Shared;
1056
1057 my $q = MCE::Shared->queue( await => 1, fast => 1 );
1058 my ( $producers, $consumers ) = ( 1, 8 );
1059
1060 mce_flow {
1061    task_name   => [ 'producer', 'consumer' ],
1062    max_workers => [ $producers, $consumers ],
1063 },
1064 sub {
1065    ## producer
1066    for my $item ( 1 .. 100 ) {
1067       $q->enqueue($item);
1068
1069       ## blocks until the # of items pending reaches <= 10
1070       if ($item % 10 == 0) {
1071          MCE->say( 'pending: '.$q->pending() );
1072          $q->await(10);
1073       }
1074    }
1075
1076    ## notify consumers no more work
1077    $q->end();
1078
1079 },
1080 sub {
1081    ## consumers
1082    while (defined (my $next = $q->dequeue())) {
1083       MCE->say( MCE->task_wid().': '.$next );
1084       sleep 0.100;
1085    }
1086 };
1087
1088=head2 clear ( )
1089
1090Clears the queue of any items.
1091
1092 $q->clear;
1093
1094=head2 end ( )
1095
1096Stops the queue from receiving more items. Any worker blocking on C<dequeue>
1097will be unblocked automatically. Subsequent calls to C<dequeue> will behave
1098like C<dequeue_nb>. Current API available since MCE::Shared 1.814.
1099
1100 $q->end();
1101
1102MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one might
1103want to enqueue C<undef>'s versus calling C<end>. The number of C<undef>'s
1104depends on how many items workers dequeue at a time.
1105
1106 $q->enqueue((undef) x ($N_workers * 1));  # $q->dequeue()   1 item
1107 $q->enqueue((undef) x ($N_workers * 2));  # $q->dequeue(2)  2 items
1108 $q->enqueue((undef) x ($N_workers * N));  # $q->dequeue(N)  N items
1109
1110=head2 enqueue ( item [, item, ... ] )
1111
1112Appends a list of items onto the end of the normal queue.
1113
1114 $q->enqueue( 'foo' );
1115 $q->enqueue( 'bar', 'baz' );
1116
1117=head2 enqueuep ( priority, item [, item, ... ] )
1118
1119Appends a list of items onto the end of the priority queue with priority.
1120
1121 $q->enqueue( $priority, 'foo' );
1122 $q->enqueue( $priority, 'bar', 'baz' );
1123
1124=head2 dequeue ( [ count ] )
1125
1126Returns the requested number of items (default 1) from the queue. Priority
1127data will always dequeue first before any data from the normal queue.
1128
1129 $q->dequeue( 2 );
1130 $q->dequeue; # default 1
1131
1132The method will block if the queue contains zero items. If the queue contains
1133fewer than the requested number of items, the method will not block, but
1134return whatever items there are on the queue.
1135
1136The $count, used for requesting the number of items, is beneficial when workers
1137are passing parameters through the queue. For this reason, always remember to
1138dequeue using the same multiple for the count. This is unlike Thread::Queue
1139which will block until the requested number of items are available.
1140
1141 # MCE::Shared::Queue 1.816 and prior releases
1142 while ( my @items = $q->dequeue(2) ) {
1143    last unless ( defined $items[0] );
1144    ...
1145 }
1146
1147 # MCE::Shared::Queue 1.817 and later
1148 while ( my @items = $q->dequeue(2) ) {
1149    ...
1150 }
1151
1152=head2 dequeue_nb ( [ count ] )
1153
1154Returns the requested number of items (default 1) from the queue. Like with
1155dequeue, priority data will always dequeue first. This method is non-blocking
1156and returns C<undef> in the absence of data.
1157
1158 $q->dequeue_nb( 2 );
1159 $q->dequeue_nb; # default 1
1160
1161=head2 insert ( index, item [, item, ... ] )
1162
1163Adds the list of items to the queue at the specified index position (0 is the
1164head of the list). The head of the queue is that item which would be removed
1165by a call to dequeue.
1166
1167 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
1168 $q->enqueue(1, 2, 3, 4);
1169 $q->insert(1, 'foo', 'bar');
1170 # Queue now contains: 1, foo, bar, 2, 3, 4
1171
1172 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
1173 $q->enqueue(1, 2, 3, 4);
1174 $q->insert(1, 'foo', 'bar');
1175 # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
1176
1177=head2 insertp ( priority, index, item [, item, ... ] )
1178
1179Adds the list of items to the queue at the specified index position with
1180priority. The behavior is similarly to C<< $q->insert >> otherwise.
1181
1182=head2 pending ( )
1183
1184Returns the number of items in the queue. The count includes both normal
1185and priority data. Returns C<undef> if the queue has been ended, and there
1186are no more items in the queue.
1187
1188 $q = MCE::Shared->queue();
1189 $q->enqueuep(5, 'foo', 'bar');
1190 $q->enqueue('sunny', 'day');
1191
1192 print $q->pending(), "\n";
1193 # Output: 4
1194
1195=head2 peek ( [ index ] )
1196
1197Returns an item from the normal queue, at the specified index, without
1198dequeuing anything. It defaults to the head of the queue if index is not
1199specified. The head of the queue is that item which would be removed by a
1200call to dequeue. Negative index values are supported, similarly to arrays.
1201
1202 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
1203 $q->enqueue(1, 2, 3, 4, 5);
1204
1205 print $q->peek(1), ' ', $q->peek(-2), "\n";
1206 # Output: 2 4
1207
1208 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
1209 $q->enqueue(1, 2, 3, 4, 5);
1210
1211 print $q->peek(1), ' ', $q->peek(-2), "\n";
1212 # Output: 4 2
1213
1214=head2 peekp ( priority [, index ] )
1215
1216Returns an item from the queue with priority, at the specified index, without
1217dequeuing anything. It defaults to the head of the queue if index is not
1218specified. The behavior is similarly to C<< $q->peek >> otherwise.
1219
1220=head2 peekh ( [ index ] )
1221
1222Returns an item from the head of the heap or at the specified index.
1223
1224 $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
1225 $q->enqueuep(5, 'foo');
1226 $q->enqueuep(6, 'bar');
1227 $q->enqueuep(4, 'sun');
1228
1229 print $q->peekh(0), "\n";
1230 # Output: 6
1231
1232 $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST );
1233 $q->enqueuep(5, 'foo');
1234 $q->enqueuep(6, 'bar');
1235 $q->enqueuep(4, 'sun');
1236
1237 print $q->peekh(0), "\n";
1238 # Output: 4
1239
1240=head2 heap ( )
1241
1242Returns an array containing the heap data. Heap data consists of priority
1243numbers, not the data.
1244
1245 @h = $q->heap;   # $MCE::Shared::Queue::HIGHEST
1246 # Heap contains: 6, 5, 4
1247
1248 @h = $q->heap;   # $MCE::Shared::Queue::LOWEST
1249 # Heap contains: 4, 5, 6
1250
1251=head1 ACKNOWLEDGMENTS
1252
1253=over 3
1254
1255=item * L<List::BinarySearch>
1256
1257The bsearch_num_pos method was helpful for accommodating the highest and lowest
1258order in MCE::Shared::Queue.
1259
1260=item * L<POE::Queue::Array>
1261
1262For extra optimization, two if statements were adopted for checking if the item
1263belongs at the end or head of the queue.
1264
1265=item * L<List::Priority>
1266
1267MCE::Shared::Queue supports both normal and priority queues.
1268
1269=item * L<Thread::Queue>
1270
1271Thread::Queue is used as a template for identifying and documenting the methods.
1272MCE::Shared::Queue is not fully compatible due to supporting normal and priority
1273queues simultaneously; e.g.
1274
1275 $q->enqueue( $item [, $item, ... ] );         # normal queue
1276 $q->enqueuep( $p, $item [, $item, ... ] );    # priority queue
1277
1278 $q->dequeue( [ $count ] );      # priority data dequeues first
1279 $q->dequeue_nb( [ $count ] );
1280
1281 $q->pending();                  # counts both normal/priority queues
1282
1283=back
1284
1285=head1 LIMITATIONS
1286
1287Perl must have L<IO::FDPass> for constructing a shared C<condvar> or C<queue>
1288while the shared-manager process is running. For platforms where L<IO::FDPass>
1289isn't possible, construct C<condvar> and C<queue> before other classes.
1290On systems without C<IO::FDPass>, the manager process is delayed until sharing
1291other classes or started explicitly.
1292
1293 use MCE::Shared;
1294
1295 my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0;
1296
1297 my $cv  = MCE::Shared->condvar();
1298 my $que = MCE::Shared->queue();
1299
1300 MCE::Shared->start() unless $has_IO_FDPass;
1301
1302Regarding mce_open, C<IO::FDPass> is needed for constructing a shared-handle
1303from a non-shared handle not yet available inside the shared-manager process.
1304The workaround is to have the non-shared handle made before the shared-manager
1305is started. Passing a file by reference is fine for the three STD* handles.
1306
1307 # The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR.
1308
1309 mce_open my $shared_in,  "<",  \*STDIN;   # ok
1310 mce_open my $shared_out, ">>", \*STDOUT;  # ok
1311 mce_open my $shared_err, ">>", \*STDERR;  # ok
1312 mce_open my $shared_fh1, "<",  "/path/to/sequence.fasta";  # ok
1313 mce_open my $shared_fh2, ">>", "/path/to/results.log";     # ok
1314
1315 mce_open my $shared_fh, ">>", \*NON_SHARED_FH;  # requires IO::FDPass
1316
1317The L<IO::FDPass> module is known to work reliably on most platforms.
1318Install 1.1 or later to rid of limitations described above.
1319
1320 perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'"
1321
1322=head1 INDEX
1323
1324L<MCE|MCE>, L<MCE::Hobo>, L<MCE::Shared>
1325
1326=head1 AUTHOR
1327
1328Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1329
1330=cut
1331
1332