1###############################################################################
2## ----------------------------------------------------------------------------
3## Parallel flow model for building creative applications.
4##
5###############################################################################
6
7package MCE::Flow;
8
9use strict;
10use warnings;
11
12no warnings qw( threads recursion uninitialized );
13
14our $VERSION = '1.876';
15
16## no critic (BuiltinFunctions::ProhibitStringyEval)
17## no critic (Subroutines::ProhibitSubroutinePrototypes)
18## no critic (TestingAndDebugging::ProhibitNoStrict)
19
20use Scalar::Util qw( looks_like_number );
21use MCE;
22
23our @CARP_NOT = qw( MCE );
24
25my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26
27sub CLONE {
28   $_tid = threads->tid() if $INC{'threads.pm'};
29}
30
31###############################################################################
32## ----------------------------------------------------------------------------
33## Import routine.
34##
35###############################################################################
36
37my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Flow');
38my ($_prev_c, $_prev_n, $_prev_t, $_prev_w) = ({}, {}, {}, {});
39my ($_user_tasks) = ({});
40
41sub import {
42   my ($_class, $_pkg) = (shift, caller);
43
44   my $_p = $_def->{$_pkg} = {
45      MAX_WORKERS => 'auto',
46      CHUNK_SIZE  => 'auto',
47   };
48
49   ## Import functions.
50   no strict 'refs'; no warnings 'redefine';
51
52   *{ $_pkg.'::mce_flow_f' } = \&run_file;
53   *{ $_pkg.'::mce_flow_s' } = \&run_seq;
54   *{ $_pkg.'::mce_flow'   } = \&run;
55
56   ## Process module arguments.
57   while ( my $_argument = shift ) {
58      my $_arg = lc $_argument;
59
60      $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
61      $_p->{CHUNK_SIZE}  = shift, next if ( $_arg eq 'chunk_size' );
62      $_p->{TMP_DIR}     = shift, next if ( $_arg eq 'tmp_dir' );
63      $_p->{FREEZE}      = shift, next if ( $_arg eq 'freeze' );
64      $_p->{THAW}        = shift, next if ( $_arg eq 'thaw' );
65
66      ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
67      if ( $_arg eq 'sereal' ) {
68         if ( shift eq '0' ) {
69            require Storable;
70            $_p->{FREEZE} = \&Storable::freeze;
71            $_p->{THAW}   = \&Storable::thaw;
72         }
73         next;
74      }
75
76      _croak("Error: ($_argument) invalid module option");
77   }
78
79   $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
80
81   MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
82   MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
83      unless ($_p->{CHUNK_SIZE} eq 'auto');
84
85   return;
86}
87
88###############################################################################
89## ----------------------------------------------------------------------------
90## Init and finish routines.
91##
92###############################################################################
93
94sub init (@) {
95
96   shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
97   my $_pkg = "$$.$_tid.".caller();
98
99   $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
100
101   @_ = ();
102
103   return;
104}
105
106sub finish (@) {
107
108   shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
109   my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
110
111   if ( $_pkg eq 'MCE' ) {
112      for my $_k ( keys %{ $_MCE } ) { MCE::Flow->finish($_k, 1); }
113   }
114   elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
115      $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
116
117      delete $_user_tasks->{$_pkg};
118      delete $_prev_c->{$_pkg};
119      delete $_prev_n->{$_pkg};
120      delete $_prev_t->{$_pkg};
121      delete $_prev_w->{$_pkg};
122      delete $_MCE->{$_pkg};
123   }
124
125   @_ = ();
126
127   return;
128}
129
130###############################################################################
131## ----------------------------------------------------------------------------
132## Parallel flow with MCE -- file.
133##
134###############################################################################
135
136sub run_file (@) {
137
138   shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
139
140   my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
141   my $_pid = "$$.$_tid.".caller();
142
143   if (defined (my $_p = $_params->{$_pid})) {
144      delete $_p->{input_data} if (exists $_p->{input_data});
145      delete $_p->{sequence}   if (exists $_p->{sequence});
146   }
147   else {
148      $_params->{$_pid} = {};
149   }
150
151   for my $_i ($_start_pos .. @_ - 1) {
152      my $_r = ref $_[$_i];
153      if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
154         $_file = $_[$_i]; $_pos = $_i;
155         last;
156      }
157   }
158
159   if (defined $_file && ref $_file eq '' && $_file ne '') {
160      _croak("$_tag: ($_file) does not exist")      unless (-e $_file);
161      _croak("$_tag: ($_file) is not readable")     unless (-r $_file);
162      _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
163      $_params->{$_pid}{_file} = $_file;
164   }
165   elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
166      $_params->{$_pid}{_file} = $_file;
167   }
168   else {
169      _croak("$_tag: (file) is not specified or valid");
170   }
171
172   if (defined $_pos) {
173      pop @_ for ($_pos .. @_ - 1);
174   }
175
176   return run(@_);
177}
178
179###############################################################################
180## ----------------------------------------------------------------------------
181## Parallel flow with MCE -- sequence.
182##
183###############################################################################
184
185sub run_seq (@) {
186
187   shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
188
189   my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
190   my $_pid = "$$.$_tid.".caller();
191
192   if (defined (my $_p = $_params->{$_pid})) {
193      delete $_p->{sequence}   if (exists $_p->{sequence});
194      delete $_p->{input_data} if (exists $_p->{input_data});
195      delete $_p->{_file}      if (exists $_p->{_file});
196   }
197   else {
198      $_params->{$_pid} = {};
199   }
200
201   for my $_i ($_start_pos .. @_ - 1) {
202      my $_r = ref $_[$_i];
203
204      if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
205         $_pos = $_i;
206
207         if ($_r eq '' || $_r =~ /^Math::/) {
208            $_begin = $_[$_pos]; $_end = $_[$_pos + 1];
209            $_params->{$_pid}{sequence} = [
210               $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
211            ];
212         }
213         elsif ($_r eq 'HASH') {
214            $_begin = $_[$_pos]->{begin}; $_end = $_[$_pos]->{end};
215            $_params->{$_pid}{sequence} = $_[$_pos];
216         }
217         elsif ($_r eq 'ARRAY') {
218            $_begin = $_[$_pos]->[0]; $_end = $_[$_pos]->[1];
219            $_params->{$_pid}{sequence} = $_[$_pos];
220         }
221
222         last;
223      }
224   }
225
226   _croak("$_tag: (sequence) is not specified or valid")
227      unless (exists $_params->{$_pid}{sequence});
228   _croak("$_tag: (begin) is not specified for sequence")
229      unless (defined $_begin);
230   _croak("$_tag: (end) is not specified for sequence")
231      unless (defined $_end);
232
233   $_params->{$_pid}{sequence_run} = undef;
234
235   if (defined $_pos) {
236      pop @_ for ($_pos .. @_ - 1);
237   }
238
239   return run(@_);
240}
241
242###############################################################################
243## ----------------------------------------------------------------------------
244## Parallel flow with MCE.
245##
246###############################################################################
247
248sub run (@) {
249
250   shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
251
252   my $_pkg = caller() eq 'MCE::Flow' ? caller(1) : caller();
253   my $_pid = "$$.$_tid.$_pkg";
254
255   if (ref $_[0] eq 'HASH') {
256      $_params->{$_pid} = {} unless defined $_params->{$_pid};
257      for my $_p (keys %{ $_[0] }) {
258         $_params->{$_pid}{$_p} = $_[0]->{$_p};
259      }
260
261      shift;
262   }
263
264   ## -------------------------------------------------------------------------
265
266   my (@_code, @_name, @_thrs, @_wrks); my $_init_mce = 0; my $_pos = 0;
267
268   while (ref $_[0] eq 'CODE') {
269      push @_code, $_[0];
270
271      if (defined (my $_p = $_params->{$_pid})) {
272         push @_name, (ref $_p->{task_name} eq 'ARRAY')
273            ? $_p->{task_name}->[$_pos] : undef;
274         push @_thrs, (ref $_p->{use_threads} eq 'ARRAY')
275            ? $_p->{use_threads}->[$_pos] : undef;
276         push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
277            ? $_p->{max_workers}->[$_pos] : undef;
278      }
279
280      $_init_mce = 1 if (
281         !defined $_prev_c->{$_pid}[$_pos] ||
282         $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
283      );
284
285      $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
286      $_init_mce = 1 if ($_prev_t->{$_pid}[$_pos] ne $_thrs[$_pos]);
287      $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
288
289      $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
290      $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
291      $_prev_t->{$_pid}[$_pos] = $_thrs[$_pos];
292      $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
293
294      shift; $_pos++;
295   }
296
297   if (defined $_prev_c->{$_pid}[$_pos]) {
298      pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
299      pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
300      pop @{ $_prev_t->{$_pid} } for ($_pos .. $#{ $_prev_t->{$_pid } });
301      pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
302
303      $_init_mce = 1;
304   }
305
306   return unless (scalar @_code);
307
308   ## -------------------------------------------------------------------------
309
310   my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
311   my $_r = ref $_[0];
312
313   if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::)/) {
314      $_input_data = shift;
315   }
316
317   if (defined (my $_p = $_params->{$_pid})) {
318      $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
319         if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
320
321      delete $_p->{sequence}   if (defined $_input_data || scalar @_);
322      delete $_p->{user_func}  if (exists $_p->{user_func});
323      delete $_p->{user_tasks} if (exists $_p->{user_tasks});
324   }
325
326   if (@_code > 1 && $_max_workers > 1) {
327      $_max_workers = int($_max_workers / @_code + 0.5) + 1;
328   }
329
330   my $_chunk_size = MCE::_parse_chunk_size(
331      $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
332      $_input_data, scalar @_
333   );
334
335   if (defined (my $_p = $_params->{$_pid})) {
336      if (exists $_p->{_file}) {
337         $_input_data = delete $_p->{_file};
338      } else {
339         $_input_data = $_p->{input_data} if exists $_p->{input_data};
340      }
341   }
342
343   ## -------------------------------------------------------------------------
344
345   MCE::_save_state($_MCE->{$_pid});
346
347   if ($_init_mce) {
348      $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
349
350      ## must clear arrays for nested session to work with Perl < v5.14
351      _gen_user_tasks($_pid, [@_code], [@_name], [@_thrs], [@_wrks]);
352
353      @_code = @_name = @_thrs = @_wrks = ();
354
355      my %_opts = (
356         max_workers => $_max_workers, task_name => $_tag,
357         user_tasks  => $_user_tasks->{$_pid},
358      );
359
360      if (defined (my $_p = $_params->{$_pid})) {
361         local $_;
362
363         for (keys %{ $_p }) {
364            next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
365            next if ($_ eq 'task_name'   && ref $_p->{task_name}   eq 'ARRAY');
366            next if ($_ eq 'use_threads' && ref $_p->{use_threads} eq 'ARRAY');
367
368            next if ($_ eq 'chunk_size');
369            next if ($_ eq 'input_data');
370            next if ($_ eq 'sequence_run');
371
372            _croak("$_tag: ($_) is not a valid constructor argument")
373               unless (exists $MCE::_valid_fields_new{$_});
374
375            $_opts{$_} = $_p->{$_};
376         }
377      }
378
379      for my $_k (qw/ tmp_dir freeze thaw /) {
380         $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
381            if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
382      }
383
384      $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
385   }
386   else {
387      ## Workers may persist after running. Thus, updating the MCE instance.
388      ## These options do not require respawning.
389      if (defined (my $_p = $_params->{$_pid})) {
390         for my $_k (qw(
391            RS interval stderr_file stdout_file user_error user_output
392            job_delay submit_delay on_post_exit on_post_run user_args
393            flush_file flush_stderr flush_stdout gather max_retries
394         )) {
395            $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
396         }
397      }
398   }
399
400   ## -------------------------------------------------------------------------
401
402   my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
403
404   if (defined $_input_data) {
405      @_ = ();
406      $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
407      delete $_MCE->{$_pid}{input_data};
408   }
409   elsif (scalar @_) {
410      $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
411      delete $_MCE->{$_pid}{input_data};
412   }
413   else {
414      if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
415         $_MCE->{$_pid}->run({
416             chunk_size => $_chunk_size,
417             sequence   => $_params->{$_pid}{sequence}
418         }, 0);
419         if (exists $_params->{$_pid}{sequence_run}) {
420             delete $_params->{$_pid}{sequence_run};
421             delete $_params->{$_pid}{sequence};
422         }
423         delete $_MCE->{$_pid}{sequence};
424      }
425      else {
426         $_MCE->{$_pid}->run({ chunk_size => $_chunk_size }, 0);
427      }
428   }
429
430   MCE::_restore_state();
431
432   delete $_MCE->{$_pid}{gather} if (defined $_wa);
433
434   return ((defined $_wa) ? @_a : ());
435}
436
437###############################################################################
438## ----------------------------------------------------------------------------
439## Private methods.
440##
441###############################################################################
442
443sub _croak {
444
445   goto &MCE::_croak;
446}
447
448sub _gen_user_tasks {
449
450   my ($_pid, $_code_ref, $_name_ref, $_thrs_ref, $_wrks_ref) = @_;
451
452   @{ $_user_tasks->{$_pid} } = ();
453
454   for (my $_i = 0; $_i < @{ $_code_ref }; $_i++) {
455      push @{ $_user_tasks->{$_pid} }, {
456         task_name   => $_name_ref->[$_i],
457         use_threads => $_thrs_ref->[$_i],
458         max_workers => $_wrks_ref->[$_i],
459         user_func   => $_code_ref->[$_i]
460      }
461   }
462
463   return;
464}
465
4661;
467
468__END__
469
470###############################################################################
471## ----------------------------------------------------------------------------
472## Module usage.
473##
474###############################################################################
475
476=head1 NAME
477
478MCE::Flow - Parallel flow model for building creative applications
479
480=head1 VERSION
481
482This document describes MCE::Flow version 1.876
483
484=head1 DESCRIPTION
485
486MCE::Flow is great for writing custom apps to maximize on all available cores.
487This module was created to help one harness user_tasks within MCE.
488
489It is trivial to parallelize with mce_stream shown below.
490
491 ## Native map function
492 my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
493
494 ## Same as with MCE::Stream (processing from right to left)
495 @a = mce_stream
496      sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
497
498 ## Pass an array reference to have writes occur simultaneously
499 mce_stream \@a,
500      sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
501
502However, let's have MCE::Flow compute the same in parallel. MCE::Queue
503will be used for data flow among the sub-tasks.
504
505 use MCE::Flow;
506 use MCE::Queue;
507
508This calls for preserving output order.
509
510 sub preserve_order {
511    my %tmp; my $order_id = 1; my $gather_ref = $_[0];
512    @{ $gather_ref } = ();  ## clear the array (optional)
513
514    return sub {
515       my ($data_ref, $chunk_id) = @_;
516       $tmp{$chunk_id} = $data_ref;
517
518       while (1) {
519          last unless exists $tmp{$order_id};
520          push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
521       }
522
523       return;
524    };
525 }
526
527Two queues are needed for data flow between the 3 sub-tasks. Notice task_end
528and how the value from $task_name is used for determining which task has ended.
529
530 my $b = MCE::Queue->new;
531 my $c = MCE::Queue->new;
532
533 sub task_end {
534    my ($mce, $task_id, $task_name) = @_;
535
536    if (defined $mce->{user_tasks}->[$task_id + 1]) {
537       my $n_workers = $mce->{user_tasks}->[$task_id + 1]->{max_workers};
538
539       if ($task_name eq 'a') {
540          $b->enqueue((undef) x $n_workers);
541       }
542       elsif ($task_name eq 'b') {
543          $c->enqueue((undef) x $n_workers);
544       }
545    }
546
547    return;
548 }
549
550Next are the 3 sub-tasks. The first one reads input and begins the flow.
551The 2nd task dequeues, performs the calculation, and enqueues into the next.
552Finally, the last task calls the gather method.
553
554Although serialization is done for you automatically, it is done here to save
555from double serialization. This is the fastest approach for passing data
556between sub-tasks. Thus, the least overhead.
557
558 sub task_a {
559    my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
560
561    push @ans, map { $_ * 2 } @{ $chunk_ref };
562    $b->enqueue(MCE->freeze([ \@ans, $chunk_id ]));
563
564    return;
565 }
566
567 sub task_b {
568    my ($mce) = @_;
569
570    while (1) {
571       my @ans; my $chunk = $b->dequeue;
572       last unless defined $chunk;
573
574       $chunk = MCE->thaw($chunk);
575       push @ans, map { $_ * 3 } @{ $chunk->[0] };
576       $c->enqueue(MCE->freeze([ \@ans, $chunk->[1] ]));
577    }
578
579    return;
580 }
581
582 sub task_c {
583    my ($mce) = @_;
584
585    while (1) {
586       my @ans; my $chunk = $c->dequeue;
587       last unless defined $chunk;
588
589       $chunk = MCE->thaw($chunk);
590       push @ans, map { $_ * 4 } @{ $chunk->[0] };
591       MCE->gather(\@ans, $chunk->[1]);
592    }
593
594    return;
595 }
596
597In summary, MCE::Flow builds out a MCE instance behind the scene and starts
598running. The task_name (shown), max_workers, and use_threads options can take
599an anonymous array for specifying the values uniquely per each sub-task.
600
601 my @a;
602
603 mce_flow {
604    task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
605    gather => preserve_order(\@a)
606
607 }, \&task_a, \&task_b, \&task_c, 1..10000;
608
609 print "@a\n";
610
611If speed is not a concern and wanting to rid of all the MCE->freeze and
612MCE->thaw statements, simply enqueue and dequeue 2 items at a time.
613Or better yet, see L<MCE::Step> introduced in MCE 1.506.
614
615First, task_end must be updated. The number of undef(s) must match the number
616of workers times the dequeue count. Otherwise, the script will stall.
617
618 sub task_end {
619    ...
620       if ($task_name eq 'a') {
621        # $b->enqueue((undef) x $n_workers);
622          $b->enqueue((undef) x ($n_workers * 2));
623       }
624       elsif ($task_name eq 'b') {
625        # $c->enqueue((undef) x $n_workers);
626          $c->enqueue((undef) x ($n_workers * 2));
627       }
628    ...
629 }
630
631Next, the 3 sub-tasks enqueuing and dequeuing 2 elements at a time.
632
633 sub task_a {
634    my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
635
636    push @ans, map { $_ * 2 } @{ $chunk_ref };
637    $b->enqueue(\@ans, $chunk_id);
638
639    return;
640 }
641
642 sub task_b {
643    my ($mce) = @_;
644
645    while (1) {
646       my @ans; my ($chunk_ref, $chunk_id) = $b->dequeue(2);
647       last unless defined $chunk_ref;
648
649       push @ans, map { $_ * 3 } @{ $chunk_ref };
650       $c->enqueue(\@ans, $chunk_id);
651    }
652
653    return;
654 }
655
656 sub task_c {
657    my ($mce) = @_;
658
659    while (1) {
660       my @ans; my ($chunk_ref, $chunk_id) = $c->dequeue(2);
661       last unless defined $chunk_ref;
662
663       push @ans, map { $_ * 4 } @{ $chunk_ref };
664       MCE->gather(\@ans, $chunk_id);
665    }
666
667    return;
668 }
669
670Finally, run as usual.
671
672 my @a;
673
674 mce_flow {
675    task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
676    gather => preserve_order(\@a)
677
678 }, \&task_a, \&task_b, \&task_c, 1..10000;
679
680 print "@a\n";
681
682=head1 SYNOPSIS when CHUNK_SIZE EQUALS 1
683
684Although L<MCE::Loop> may be preferred for running using a single code block,
685the text below also applies to this module, particularly for the first block.
686
687All models in MCE default to 'auto' for chunk_size. The arguments for the block
688are the same as writing a user_func block using the Core API.
689
690Beginning with MCE 1.5, the next input item is placed into the input scalar
691variable $_ when chunk_size equals 1. Otherwise, $_ points to $chunk_ref
692containing many items. Basically, line 2 below may be omitted from your code
693when using $_. One can call MCE->chunk_id to obtain the current chunk id.
694
695 line 1:  user_func => sub {
696 line 2:     my ($mce, $chunk_ref, $chunk_id) = @_;
697 line 3:
698 line 4:     $_ points to $chunk_ref->[0]
699 line 5:        in MCE 1.5 when chunk_size == 1
700 line 6:
701 line 7:     $_ points to $chunk_ref
702 line 8:        in MCE 1.5 when chunk_size  > 1
703 line 9:  }
704
705Follow this synopsis when chunk_size equals one. Looping is not required from
706inside the first block. Hence, the block is called once per each item.
707
708 ## Exports mce_flow, mce_flow_f, and mce_flow_s
709 use MCE::Flow;
710
711 MCE::Flow->init(
712    chunk_size => 1
713 );
714
715 ## Array or array_ref
716 mce_flow sub { do_work($_) }, 1..10000;
717 mce_flow sub { do_work($_) }, \@list;
718
719 ## Important; pass an array_ref for deeply input data
720 mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
721 mce_flow sub { do_work($_) }, \@deeply_list;
722
723 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
724 ## Workers read directly and not involve the manager process
725 mce_flow_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
726
727 ## Involves the manager process, therefore slower
728 mce_flow_f sub { chomp; do_work($_) }, $file_handle;
729 mce_flow_f sub { chomp; do_work($_) }, $io;
730 mce_flow_f sub { chomp; do_work($_) }, \$scalar;
731
732 ## Sequence of numbers (begin, end [, step, format])
733 mce_flow_s sub { do_work($_) }, 1, 10000, 5;
734 mce_flow_s sub { do_work($_) }, [ 1, 10000, 5 ];
735
736 mce_flow_s sub { do_work($_) }, {
737    begin => 1, end => 10000, step => 5, format => undef
738 };
739
740=head1 SYNOPSIS when CHUNK_SIZE is GREATER THAN 1
741
742Follow this synopsis when chunk_size equals 'auto' or greater than 1.
743This means having to loop through the chunk from inside the first block.
744
745 use MCE::Flow;
746
747 MCE::Flow->init(           ## Chunk_size defaults to 'auto' when
748    chunk_size => 'auto'    ## not specified. Therefore, the init
749 );                         ## function may be omitted.
750
751 ## Syntax is shown for mce_flow for demonstration purposes.
752 ## Looping inside the block is the same for mce_flow_f and
753 ## mce_flow_s.
754
755 ## Array or array_ref
756 mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000;
757 mce_flow sub { do_work($_) for (@{ $_ }) }, \@list;
758
759 ## Important; pass an array_ref for deeply input data
760 mce_flow sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
761 mce_flow sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
762
763 ## Resembles code using the core MCE API
764 mce_flow sub {
765    my ($mce, $chunk_ref, $chunk_id) = @_;
766
767    for (@{ $chunk_ref }) {
768       do_work($_);
769    }
770
771 }, 1..10000;
772
773Chunking reduces the number of IPC calls behind the scene. Think in terms of
774chunks whenever processing a large amount of data. For relatively small data,
775choosing 1 for chunk_size is fine.
776
777=head1 OVERRIDING DEFAULTS
778
779The following list options which may be overridden when loading the module.
780
781 use Sereal qw( encode_sereal decode_sereal );
782 use CBOR::XS qw( encode_cbor decode_cbor );
783 use JSON::XS qw( encode_json decode_json );
784
785 use MCE::Flow
786     max_workers => 8,                # Default 'auto'
787     chunk_size => 500,               # Default 'auto'
788     tmp_dir => "/path/to/app/tmp",   # $MCE::Signal::tmp_dir
789     freeze => \&encode_sereal,       # \&Storable::freeze
790     thaw => \&decode_sereal          # \&Storable::thaw
791 ;
792
793From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
794Specify C<< Sereal => 0 >> to use Storable instead.
795
796 use MCE::Flow Sereal => 0;
797
798=head1 CUSTOMIZING MCE
799
800=over 3
801
802=item MCE::Flow->init ( options )
803
804=item MCE::Flow::init { options }
805
806=back
807
808The init function accepts a hash of MCE options. Unlike with MCE::Stream,
809both gather and bounds_only options may be specified when calling init
810(not shown below).
811
812 use MCE::Flow;
813
814 MCE::Flow->init(
815    chunk_size => 1, max_workers => 4,
816
817    user_begin => sub {
818       print "## ", MCE->wid, " started\n";
819    },
820
821    user_end => sub {
822       print "## ", MCE->wid, " completed\n";
823    }
824 );
825
826 my %a = mce_flow sub { MCE->gather($_, $_ * $_) }, 1..100;
827
828 print "\n", "@a{1..100}", "\n";
829
830 -- Output
831
832 ## 3 started
833 ## 2 started
834 ## 4 started
835 ## 1 started
836 ## 2 completed
837 ## 4 completed
838 ## 3 completed
839 ## 1 completed
840
841 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
842 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
843 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
844 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
845 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
846 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
847 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
848 10000
849
850Like with MCE::Flow->init above, MCE options may be specified using an
851anonymous hash for the first argument. Notice how task_name, max_workers,
852and use_threads can take an anonymous array for setting uniquely per
853each code block.
854
855Unlike MCE::Stream which processes from right-to-left, MCE::Flow begins
856with the first code block, thus processing from left-to-right.
857
858 use threads;
859 use MCE::Flow;
860
861 my @a = mce_flow {
862    task_name   => [ 'a', 'b', 'c' ],
863    max_workers => [  3,   4,   2, ],
864    use_threads => [  1,   0,   0, ],
865
866    user_end => sub {
867       my ($mce, $task_id, $task_name) = @_;
868       MCE->print("$task_id - $task_name completed\n");
869    },
870
871    task_end => sub {
872       my ($mce, $task_id, $task_name) = @_;
873       MCE->print("$task_id - $task_name ended\n");
874    }
875 },
876 sub { sleep 1; },   ## 3 workers, named a
877 sub { sleep 2; },   ## 4 workers, named b
878 sub { sleep 3; };   ## 2 workers, named c
879
880 -- Output
881
882 0 - a completed
883 0 - a completed
884 0 - a completed
885 0 - a ended
886 1 - b completed
887 1 - b completed
888 1 - b completed
889 1 - b completed
890 1 - b ended
891 2 - c completed
892 2 - c completed
893 2 - c ended
894
895=head1 API DOCUMENTATION
896
897Although input data is optional for MCE::Flow, the following assumes chunk_size
898equals 1 in order to demonstrate all the possibilities for providing input data.
899
900=over 3
901
902=item MCE::Flow->run ( sub { code }, list )
903
904=item mce_flow sub { code }, list
905
906=back
907
908Input data may be defined using a list, an array ref, or a hash ref.
909
910Unlike MCE::Loop, Map, and Grep which take a block as C<{ ... }>, Flow takes a
911C<sub { ... }> or a code reference. The other difference is that the comma is
912needed after the block.
913
914 # $_ contains the item when chunk_size => 1
915
916 mce_flow sub { do_work($_) }, 1..1000;
917 mce_flow sub { do_work($_) }, \@list;
918
919 # Important; pass an array_ref for deeply input data
920
921 mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
922 mce_flow sub { do_work($_) }, \@deeply_list;
923
924 # Chunking; any chunk_size => 1 or greater
925
926 my %res = mce_flow sub {
927    my ($mce, $chunk_ref, $chunk_id) = @_;
928    my %ret;
929    for my $item (@{ $chunk_ref }) {
930       $ret{$item} = $item * 2;
931    }
932    MCE->gather(%ret);
933 },
934 \@list;
935
936 # Input hash; current API available since 1.828
937
938 my %res = mce_flow sub {
939    my ($mce, $chunk_ref, $chunk_id) = @_;
940    my %ret;
941    for my $key (keys %{ $chunk_ref }) {
942       $ret{$key} = $chunk_ref->{$key} * 2;
943    }
944    MCE->gather(%ret);
945 },
946 \%hash;
947
948 # Unlike MCE::Loop, MCE::Flow doesn't need input to run
949
950 mce_flow { max_workers => 4 }, sub {
951    MCE->say( MCE->wid );
952 };
953
954 # ... and can run multiple tasks
955
956 mce_flow {
957    max_workers => [  1,   3  ],
958    task_name   => [ 'p', 'c' ]
959 },
960 sub {
961    # 1 producer
962    MCE->say( "producer: ", MCE->wid );
963 },
964 sub {
965    # 3 consumers
966    MCE->say( "consumer: ", MCE->wid );
967 };
968
969 # Here, options are specified via init
970
971 MCE::Flow->init(
972    max_workers => [  1,   3  ],
973    task_name   => [ 'p', 'c' ]
974 );
975
976 mce_flow \&producer, \&consumers;
977
978=over 3
979
980=item MCE::Flow->run_file ( sub { code }, file )
981
982=item mce_flow_f sub { code }, file
983
984=back
985
986The fastest of these is the /path/to/file. Workers communicate the next offset
987position among themselves with zero interaction by the manager process.
988
989C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
990
991 # $_ contains the line when chunk_size => 1
992
993 mce_flow_f sub { $_ }, "/path/to/file";  # faster
994 mce_flow_f sub { $_ }, $file_handle;
995 mce_flow_f sub { $_ }, $io;              # IO::All
996 mce_flow_f sub { $_ }, \$scalar;
997
998 # chunking, any chunk_size => 1 or greater
999
1000 my %res = mce_flow_f sub {
1001    my ($mce, $chunk_ref, $chunk_id) = @_;
1002    my $buf = '';
1003    for my $line (@{ $chunk_ref }) {
1004       $buf .= $line;
1005    }
1006    MCE->gather($chunk_id, $buf);
1007 },
1008 "/path/to/file";
1009
1010=over 3
1011
1012=item MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
1013
1014=item mce_flow_s sub { code }, $beg, $end [, $step, $fmt ]
1015
1016=back
1017
1018Sequence may be defined as a list, an array reference, or a hash reference.
1019The functions require both begin and end values to run. Step and format are
1020optional. The format is passed to sprintf (% may be omitted below).
1021
1022 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
1023
1024 # $_ contains the sequence number when chunk_size => 1
1025
1026 mce_flow_s sub { $_ }, $beg, $end, $step, $fmt;
1027 mce_flow_s sub { $_ }, [ $beg, $end, $step, $fmt ];
1028
1029 mce_flow_s sub { $_ }, {
1030    begin => $beg, end => $end,
1031    step => $step, format => $fmt
1032 };
1033
1034 # chunking, any chunk_size => 1 or greater
1035
1036 my %res = mce_flow_s sub {
1037    my ($mce, $chunk_ref, $chunk_id) = @_;
1038    my $buf = '';
1039    for my $seq (@{ $chunk_ref }) {
1040       $buf .= "$seq\n";
1041    }
1042    MCE->gather($chunk_id, $buf);
1043 },
1044 [ $beg, $end ];
1045
1046The sequence engine can compute 'begin' and 'end' items only, for the chunk,
1047and not the items in between (hence boundaries only). This option applies
1048to sequence only and has no effect when chunk_size equals 1.
1049
1050The time to run is 0.006s below. This becomes 0.827s without the bounds_only
1051option due to computing all items in between, thus creating a very large
1052array. Basically, specify bounds_only => 1 when boundaries is all you need
1053for looping inside the block; e.g. Monte Carlo simulations.
1054
1055Time was measured using 1 worker to emphasize the difference.
1056
1057 use MCE::Flow;
1058
1059 MCE::Flow->init(
1060    max_workers => 1, chunk_size => 1_250_000,
1061    bounds_only => 1
1062 );
1063
1064 # Typically, the input scalar $_ contains the sequence number
1065 # when chunk_size => 1, unless the bounds_only option is set
1066 # which is the case here. Thus, $_ points to $chunk_ref.
1067
1068 mce_flow_s sub {
1069    my ($mce, $chunk_ref, $chunk_id) = @_;
1070
1071    # $chunk_ref contains 2 items, not 1_250_000
1072    # my ( $begin, $end ) = ( $_->[0], $_->[1] );
1073
1074    my $begin = $chunk_ref->[0];
1075    my $end   = $chunk_ref->[1];
1076
1077    # for my $seq ( $begin .. $end ) {
1078    #    ...
1079    # }
1080
1081    MCE->printf("%7d .. %8d\n", $begin, $end);
1082 },
1083 [ 1, 10_000_000 ];
1084
1085 -- Output
1086
1087       1 ..  1250000
1088 1250001 ..  2500000
1089 2500001 ..  3750000
1090 3750001 ..  5000000
1091 5000001 ..  6250000
1092 6250001 ..  7500000
1093 7500001 ..  8750000
1094 8750001 .. 10000000
1095
1096=over 3
1097
1098=item MCE::Flow->run ( { input_data => iterator }, sub { code } )
1099
1100=item mce_flow { input_data => iterator }, sub { code }
1101
1102=back
1103
1104An iterator reference may be specified for input_data. The only other way
1105is to specify input_data via MCE::Flow->init. This prevents MCE::Flow from
1106configuring the iterator reference as another user task which will not work.
1107
1108Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
1109
1110 MCE::Flow->init(
1111    input_data => iterator
1112 );
1113
1114 mce_flow sub { $_ };
1115
1116=head1 GATHERING DATA
1117
1118Unlike MCE::Map where gather and output order are done for you automatically,
1119the gather method is used to have results sent back to the manager process.
1120
1121 use MCE::Flow chunk_size => 1;
1122
1123 ## Output order is not guaranteed.
1124 my @a1 = mce_flow sub { MCE->gather($_ * 2) }, 1..100;
1125 print "@a1\n\n";
1126
1127 ## Outputs to a hash instead (key, value).
1128 my %h1 = mce_flow sub { MCE->gather($_, $_ * 2) }, 1..100;
1129 print "@h1{1..100}\n\n";
1130
1131 ## This does the same thing due to chunk_id starting at one.
1132 my %h2 = mce_flow sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
1133 print "@h2{1..100}\n\n";
1134
1135The gather method may be called multiple times within the block unlike return
1136which would leave the block. Therefore, think of gather as yielding results
1137immediately to the manager process without actually leaving the block.
1138
1139 use MCE::Flow chunk_size => 1, max_workers => 3;
1140
1141 my @hosts = qw(
1142    hosta hostb hostc hostd hoste
1143 );
1144
1145 my %h3 = mce_flow sub {
1146    my ($output, $error, $status); my $host = $_;
1147
1148    ## Do something with $host;
1149    $output = "Worker ". MCE->wid .": Hello from $host";
1150
1151    if (MCE->chunk_id % 3 == 0) {
1152       ## Simulating an error condition
1153       local $? = 1; $status = $?;
1154       $error = "Error from $host"
1155    }
1156    else {
1157       $status = 0;
1158    }
1159
1160    ## Ensure unique keys (key, value) when gathering to
1161    ## a hash.
1162    MCE->gather("$host.out", $output);
1163    MCE->gather("$host.err", $error) if (defined $error);
1164    MCE->gather("$host.sta", $status);
1165
1166 }, @hosts;
1167
1168 foreach my $host (@hosts) {
1169    print $h3{"$host.out"}, "\n";
1170    print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
1171    print "Exit status: ", $h3{"$host.sta"}, "\n\n";
1172 }
1173
1174 -- Output
1175
1176 Worker 3: Hello from hosta
1177 Exit status: 0
1178
1179 Worker 2: Hello from hostb
1180 Exit status: 0
1181
1182 Worker 1: Hello from hostc
1183 Error from hostc
1184 Exit status: 1
1185
1186 Worker 3: Hello from hostd
1187 Exit status: 0
1188
1189 Worker 2: Hello from hoste
1190 Exit status: 0
1191
1192The following uses an anonymous array containing 3 elements when gathering
1193data. Serialization is automatic behind the scene.
1194
1195 my %h3 = mce_flow sub {
1196    ...
1197
1198    MCE->gather($host, [$output, $error, $status]);
1199
1200 }, @hosts;
1201
1202 foreach my $host (@hosts) {
1203    print $h3{$host}->[0], "\n";
1204    print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
1205    print "Exit status: ", $h3{$host}->[2], "\n\n";
1206 }
1207
1208Although MCE::Map comes to mind, one may want additional control when
1209gathering data such as retaining output order.
1210
1211 use MCE::Flow;
1212
1213 sub preserve_order {
1214    my %tmp; my $order_id = 1; my $gather_ref = $_[0];
1215
1216    return sub {
1217       $tmp{ (shift) } = \@_;
1218
1219       while (1) {
1220          last unless exists $tmp{$order_id};
1221          push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
1222       }
1223
1224       return;
1225    };
1226 }
1227
1228 ## Workers persist for the most part after running. Though, not always
1229 ## the case and depends on Perl. Pass a reference to a subroutine if
1230 ## workers must persist; e.g. mce_flow { ... }, \&foo, 1..100000.
1231
1232 MCE::Flow->init(
1233    chunk_size => 'auto', max_workers => 'auto'
1234 );
1235
1236 for (1..2) {
1237    my @m2;
1238
1239    mce_flow {
1240       gather => preserve_order(\@m2)
1241    },
1242    sub {
1243       my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
1244
1245       ## Compute the entire chunk data at once.
1246       push @a, map { $_ * 2 } @{ $chunk_ref };
1247
1248       ## Afterwards, invoke the gather feature, which
1249       ## will direct the data to the callback function.
1250       MCE->gather(MCE->chunk_id, @a);
1251
1252    }, 1..100000;
1253
1254    print scalar @m2, "\n";
1255 }
1256
1257 MCE::Flow->finish;
1258
1259All 6 models support 'auto' for chunk_size unlike the Core API. Think of the
1260models as the basis for providing JIT for MCE. They create the instance, tune
1261max_workers, and tune chunk_size automatically regardless of the hardware.
1262
1263The following does the same thing using the Core API. Workers persist after
1264running.
1265
1266 use MCE;
1267
1268 sub preserve_order {
1269    ...
1270 }
1271
1272 my $mce = MCE->new(
1273    max_workers => 'auto', chunk_size => 8000,
1274
1275    user_func => sub {
1276       my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
1277
1278       ## Compute the entire chunk data at once.
1279       push @a, map { $_ * 2 } @{ $chunk_ref };
1280
1281       ## Afterwards, invoke the gather feature, which
1282       ## will direct the data to the callback function.
1283       MCE->gather(MCE->chunk_id, @a);
1284    }
1285 );
1286
1287 for (1..2) {
1288    my @m2;
1289
1290    $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
1291
1292    print scalar @m2, "\n";
1293 }
1294
1295 $mce->shutdown;
1296
1297=head1 MANUAL SHUTDOWN
1298
1299=over 3
1300
1301=item MCE::Flow->finish
1302
1303=item MCE::Flow::finish
1304
1305=back
1306
1307Workers remain persistent as much as possible after running. Shutdown occurs
1308automatically when the script terminates. Call finish when workers are no
1309longer needed.
1310
1311 use MCE::Flow;
1312
1313 MCE::Flow->init(
1314    chunk_size => 20, max_workers => 'auto'
1315 );
1316
1317 mce_flow sub { ... }, 1..100;
1318
1319 MCE::Flow->finish;
1320
1321=head1 INDEX
1322
1323L<MCE|MCE>, L<MCE::Core>
1324
1325=head1 AUTHOR
1326
1327Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1328
1329=cut
1330
1331