1###############################################################################
2## ----------------------------------------------------------------------------
3## MCE model for building parallel loops.
4##
5###############################################################################
6
7package MCE::Loop;
8
9use strict;
10use warnings;
11
12no warnings qw( threads recursion uninitialized );
13
14our $VERSION = '1.876';
15
16## no critic (BuiltinFunctions::ProhibitStringyEval)
17## no critic (Subroutines::ProhibitSubroutinePrototypes)
18## no critic (TestingAndDebugging::ProhibitNoStrict)
19
20use Scalar::Util qw( looks_like_number );
21use MCE;
22
23our @CARP_NOT = qw( MCE );
24
25my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26
27sub CLONE {
28   $_tid = threads->tid() if $INC{'threads.pm'};
29}
30
31###############################################################################
32## ----------------------------------------------------------------------------
33## Import routine.
34##
35###############################################################################
36
37my ($_MCE, $_def, $_params, $_prev_c, $_tag) = ({}, {}, {}, {}, 'MCE::Loop');
38
39sub import {
40   my ($_class, $_pkg) = (shift, caller);
41
42   my $_p = $_def->{$_pkg} = {
43      MAX_WORKERS => 'auto',
44      CHUNK_SIZE  => 'auto',
45   };
46
47   ## Import functions.
48   no strict 'refs'; no warnings 'redefine';
49
50   *{ $_pkg.'::mce_loop_f' } = \&run_file;
51   *{ $_pkg.'::mce_loop_s' } = \&run_seq;
52   *{ $_pkg.'::mce_loop'   } = \&run;
53
54   ## Process module arguments.
55   while ( my $_argument = shift ) {
56      my $_arg = lc $_argument;
57
58      $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
59      $_p->{CHUNK_SIZE}  = shift, next if ( $_arg eq 'chunk_size' );
60      $_p->{TMP_DIR}     = shift, next if ( $_arg eq 'tmp_dir' );
61      $_p->{FREEZE}      = shift, next if ( $_arg eq 'freeze' );
62      $_p->{THAW}        = shift, next if ( $_arg eq 'thaw' );
63
64      ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
65      if ( $_arg eq 'sereal' ) {
66         if ( shift eq '0' ) {
67            require Storable;
68            $_p->{FREEZE} = \&Storable::freeze;
69            $_p->{THAW}   = \&Storable::thaw;
70         }
71         next;
72      }
73
74      _croak("Error: ($_argument) invalid module option");
75   }
76
77   $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
78
79   MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
80   MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
81      unless ($_p->{CHUNK_SIZE} eq 'auto');
82
83   return;
84}
85
86###############################################################################
87## ----------------------------------------------------------------------------
88## Init and finish routines.
89##
90###############################################################################
91
92sub init (@) {
93
94   shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
95   my $_pkg = "$$.$_tid.".caller();
96
97   $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
98
99   @_ = ();
100
101   return;
102}
103
104sub finish (@) {
105
106   shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
107   my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
108
109   if ( $_pkg eq 'MCE' ) {
110      for my $_k ( keys %{ $_MCE } ) { MCE::Loop->finish($_k, 1); }
111   }
112   elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
113      $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
114
115      delete $_prev_c->{$_pkg};
116      delete $_MCE->{$_pkg};
117   }
118
119   @_ = ();
120
121   return;
122}
123
124###############################################################################
125## ----------------------------------------------------------------------------
126## Parallel loop with MCE -- file.
127##
128###############################################################################
129
130sub run_file (&@) {
131
132   shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
133
134   my $_code = shift; my $_file = shift;
135   my $_pid  = "$$.$_tid.".caller();
136
137   if (defined (my $_p = $_params->{$_pid})) {
138      delete $_p->{input_data} if (exists $_p->{input_data});
139      delete $_p->{sequence}   if (exists $_p->{sequence});
140   }
141   else {
142      $_params->{$_pid} = {};
143   }
144
145   if (defined $_file && ref $_file eq '' && $_file ne '') {
146      _croak("$_tag: ($_file) does not exist")      unless (-e $_file);
147      _croak("$_tag: ($_file) is not readable")     unless (-r $_file);
148      _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
149      $_params->{$_pid}{_file} = $_file;
150   }
151   elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
152      $_params->{$_pid}{_file} = $_file;
153   }
154   else {
155      _croak("$_tag: (file) is not specified or valid");
156   }
157
158   @_ = ();
159
160   return run($_code);
161}
162
163###############################################################################
164## ----------------------------------------------------------------------------
165## Parallel loop with MCE -- sequence.
166##
167###############################################################################
168
169sub run_seq (&@) {
170
171   shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
172
173   my $_code = shift;
174   my $_pid  = "$$.$_tid.".caller();
175
176   if (defined (my $_p = $_params->{$_pid})) {
177      delete $_p->{input_data} if (exists $_p->{input_data});
178      delete $_p->{_file}      if (exists $_p->{_file});
179   }
180   else {
181      $_params->{$_pid} = {};
182   }
183
184   my ($_begin, $_end);
185
186   if (ref $_[0] eq 'HASH') {
187      $_begin = $_[0]->{begin}; $_end = $_[0]->{end};
188      $_params->{$_pid}{sequence} = $_[0];
189   }
190   elsif (ref $_[0] eq 'ARRAY') {
191      $_begin = $_[0]->[0]; $_end = $_[0]->[1];
192      $_params->{$_pid}{sequence} = $_[0];
193   }
194   elsif (ref $_[0] eq '' || ref($_[0]) =~ /^Math::/) {
195      $_begin = $_[0]; $_end = $_[1];
196      $_params->{$_pid}{sequence} = [ @_ ];
197   }
198   else {
199      _croak("$_tag: (sequence) is not specified or valid");
200   }
201
202   _croak("$_tag: (begin) is not specified for sequence")
203      unless (defined $_begin);
204   _croak("$_tag: (end) is not specified for sequence")
205      unless (defined $_end);
206
207   $_params->{$_pid}{sequence_run} = undef;
208
209   @_ = ();
210
211   return run($_code);
212}
213
214###############################################################################
215## ----------------------------------------------------------------------------
216## Parallel loop with MCE.
217##
218###############################################################################
219
220sub run (&@) {
221
222   shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
223
224   my $_code = shift;
225   my $_pkg  = caller() eq 'MCE::Loop' ? caller(1) : caller();
226   my $_pid  = "$$.$_tid.$_pkg";
227
228   my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
229   my $_r = ref $_[0];
230
231   if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|CODE|GLOB|FileHandle|IO::)/) {
232      $_input_data = shift;
233   }
234
235   if (defined (my $_p = $_params->{$_pid})) {
236      $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
237         if (exists $_p->{max_workers});
238
239      delete $_p->{sequence}   if (defined $_input_data || scalar @_);
240      delete $_p->{user_func}  if (exists $_p->{user_func});
241      delete $_p->{user_tasks} if (exists $_p->{user_tasks});
242   }
243
244   my $_chunk_size = MCE::_parse_chunk_size(
245      $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
246      $_input_data, scalar @_
247   );
248
249   if (defined (my $_p = $_params->{$_pid})) {
250      if (exists $_p->{_file}) {
251         $_input_data = delete $_p->{_file};
252      } else {
253         $_input_data = $_p->{input_data} if exists $_p->{input_data};
254      }
255   }
256
257   ## -------------------------------------------------------------------------
258
259   MCE::_save_state($_MCE->{$_pid});
260
261   if (!defined $_prev_c->{$_pid} || $_prev_c->{$_pid} != $_code) {
262      $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
263      $_prev_c->{$_pid} = $_code;
264
265      my %_opts = (
266         max_workers => $_max_workers, task_name => $_tag,
267         user_func => $_code,
268      );
269
270      if (defined (my $_p = $_params->{$_pid})) {
271         for my $_k (keys %{ $_p }) {
272            next if ($_k eq 'sequence_run');
273            next if ($_k eq 'input_data');
274            next if ($_k eq 'chunk_size');
275
276            _croak("$_tag: ($_k) is not a valid constructor argument")
277               unless (exists $MCE::_valid_fields_new{$_k});
278
279            $_opts{$_k} = $_p->{$_k};
280         }
281      }
282
283      for my $_k (qw/ tmp_dir freeze thaw /) {
284         $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
285            if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
286      }
287
288      $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
289   }
290
291   ## -------------------------------------------------------------------------
292
293   my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
294
295   if (defined $_input_data) {
296      @_ = ();
297      $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
298      delete $_MCE->{$_pid}{input_data};
299   }
300   elsif (scalar @_) {
301      $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
302      delete $_MCE->{$_pid}{input_data};
303   }
304   else {
305      if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
306         $_MCE->{$_pid}->run({
307             chunk_size => $_chunk_size,
308             sequence   => $_params->{$_pid}{sequence}
309         }, 0);
310         if (exists $_params->{$_pid}{sequence_run}) {
311             delete $_params->{$_pid}{sequence_run};
312             delete $_params->{$_pid}{sequence};
313         }
314         delete $_MCE->{$_pid}{sequence};
315      }
316   }
317
318   MCE::_restore_state();
319
320   delete $_MCE->{$_pid}{gather} if (defined $_wa);
321
322   return ((defined $_wa) ? @_a : ());
323}
324
325###############################################################################
326## ----------------------------------------------------------------------------
327## Private methods.
328##
329###############################################################################
330
331sub _croak {
332
333   goto &MCE::_croak;
334}
335
3361;
337
338__END__
339
340###############################################################################
341## ----------------------------------------------------------------------------
342## Module usage.
343##
344###############################################################################
345
346=head1 NAME
347
348MCE::Loop - MCE model for building parallel loops
349
350=head1 VERSION
351
352This document describes MCE::Loop version 1.876
353
354=head1 DESCRIPTION
355
356This module provides a parallel loop implementation through Many-Core Engine.
357MCE::Loop is not MCE::Map but more along the lines of an easy way to spin up a
358MCE instance and have user_func pointing to your code block. If you want
359something similar to map, then see L<MCE::Map>.
360
361 ## Construction when chunking is not desired
362
363 use MCE::Loop;
364
365 MCE::Loop->init(
366    max_workers => 5, chunk_size => 1
367 );
368
369 mce_loop {
370    my ($mce, $chunk_ref, $chunk_id) = @_;
371    MCE->say("$chunk_id: $_");
372 } 40 .. 48;
373
374 -- Output
375
376 3: 42
377 1: 40
378 2: 41
379 4: 43
380 5: 44
381 6: 45
382 7: 46
383 8: 47
384 9: 48
385
386 ## Construction for 'auto' or greater than 1
387
388 use MCE::Loop;
389
390 MCE::Loop->init(
391    max_workers => 5, chunk_size => 'auto'
392 );
393
394 mce_loop {
395    my ($mce, $chunk_ref, $chunk_id) = @_;
396    for (@{ $chunk_ref }) {
397       MCE->say("$chunk_id: $_");
398    }
399 } 40 .. 48;
400
401 -- Output
402
403 1: 40
404 2: 42
405 1: 41
406 4: 46
407 2: 43
408 5: 48
409 3: 44
410 4: 47
411 3: 45
412
413=head1 SYNOPSIS when CHUNK_SIZE EQUALS 1
414
415All models in MCE default to 'auto' for chunk_size. The arguments for the block
416are the same as writing a user_func block using the Core API.
417
418Beginning with MCE 1.5, the next input item is placed into the input scalar
419variable $_ when chunk_size equals 1. Otherwise, $_ points to $chunk_ref
420containing many items. Basically, line 2 below may be omitted from your code
421when using $_. One can call MCE->chunk_id to obtain the current chunk id.
422
423 line 1:  user_func => sub {
424 line 2:     my ($mce, $chunk_ref, $chunk_id) = @_;
425 line 3:
426 line 4:     $_ points to $chunk_ref->[0]
427 line 5:        in MCE 1.5 when chunk_size == 1
428 line 6:
429 line 7:     $_ points to $chunk_ref
430 line 8:        in MCE 1.5 when chunk_size  > 1
431 line 9:  }
432
433Follow this synopsis when chunk_size equals one. Looping is not required from
434inside the block. Hence, the block is called once per each item.
435
436 ## Exports mce_loop, mce_loop_f, and mce_loop_s
437 use MCE::Loop;
438
439 MCE::Loop->init(
440    chunk_size => 1
441 );
442
443 ## Array or array_ref
444 mce_loop { do_work($_) } 1..10000;
445 mce_loop { do_work($_) } \@list;
446
447 ## Important; pass an array_ref for deeply input data
448 mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ];
449 mce_loop { do_work($_) } \@deeply_list;
450
451 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
452 ## Workers read directly and not involve the manager process
453 mce_loop_f { chomp; do_work($_) } "/path/to/file"; # efficient
454
455 ## Involves the manager process, therefore slower
456 mce_loop_f { chomp; do_work($_) } $file_handle;
457 mce_loop_f { chomp; do_work($_) } $io;
458 mce_loop_f { chomp; do_work($_) } \$scalar;
459
460 ## Sequence of numbers (begin, end [, step, format])
461 mce_loop_s { do_work($_) } 1, 10000, 5;
462 mce_loop_s { do_work($_) } [ 1, 10000, 5 ];
463
464 mce_loop_s { do_work($_) } {
465    begin => 1, end => 10000, step => 5, format => undef
466 };
467
468=head1 SYNOPSIS when CHUNK_SIZE is GREATER THAN 1
469
470Follow this synopsis when chunk_size equals 'auto' or greater than 1.
471This means having to loop through the chunk from inside the block.
472
473 use MCE::Loop;
474
475 MCE::Loop->init(           ## Chunk_size defaults to 'auto' when
476    chunk_size => 'auto'    ## not specified. Therefore, the init
477 );                         ## function may be omitted.
478
479 ## Syntax is shown for mce_loop for demonstration purposes.
480 ## Looping inside the block is the same for mce_loop_f and
481 ## mce_loop_s.
482
483 ## Array or array_ref
484 mce_loop { do_work($_) for (@{ $_ }) } 1..10000;
485 mce_loop { do_work($_) for (@{ $_ }) } \@list;
486
487 ## Important; pass an array_ref for deeply input data
488 mce_loop { do_work($_) for (@{ $_ }) } [ [ 0, 1 ], [ 0, 2 ], ... ];
489 mce_loop { do_work($_) for (@{ $_ }) } \@deeply_list;
490
491 ## Resembles code using the core MCE API
492 mce_loop {
493    my ($mce, $chunk_ref, $chunk_id) = @_;
494
495    for (@{ $chunk_ref }) {
496       do_work($_);
497    }
498
499 } 1..10000;
500
501Chunking reduces the number of IPC calls behind the scene. Think in terms of
502chunks whenever processing a large amount of data. For relatively small data,
503choosing 1 for chunk_size is fine.
504
505=head1 OVERRIDING DEFAULTS
506
507The following list options which may be overridden when loading the module.
508
509 use Sereal qw( encode_sereal decode_sereal );
510 use CBOR::XS qw( encode_cbor decode_cbor );
511 use JSON::XS qw( encode_json decode_json );
512
513 use MCE::Loop
514     max_workers => 4,                # Default 'auto'
515     chunk_size => 100,               # Default 'auto'
516     tmp_dir => "/path/to/app/tmp",   # $MCE::Signal::tmp_dir
517     freeze => \&encode_sereal,       # \&Storable::freeze
518     thaw => \&decode_sereal          # \&Storable::thaw
519 ;
520
521From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
522Specify C<< Sereal => 0 >> to use Storable instead.
523
524 use MCE::Loop Sereal => 0;
525
526=head1 CUSTOMIZING MCE
527
528=over 3
529
530=item MCE::Loop->init ( options )
531
532=item MCE::Loop::init { options }
533
534=back
535
536The init function accepts a hash of MCE options.
537
538 use MCE::Loop;
539
540 MCE::Loop->init(
541    chunk_size => 1, max_workers => 4,
542
543    user_begin => sub {
544       print "## ", MCE->wid, " started\n";
545    },
546
547    user_end => sub {
548       print "## ", MCE->wid, " completed\n";
549    }
550 );
551
552 my %a = mce_loop { MCE->gather($_, $_ * $_) } 1..100;
553
554 print "\n", "@a{1..100}", "\n";
555
556 -- Output
557
558 ## 3 started
559 ## 1 started
560 ## 2 started
561 ## 4 started
562 ## 1 completed
563 ## 2 completed
564 ## 3 completed
565 ## 4 completed
566
567 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
568 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
569 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
570 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
571 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
572 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
573 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
574 10000
575
576=head1 API DOCUMENTATION
577
578The following assumes chunk_size equals 1 in order to demonstrate all the
579possibilities for providing input data.
580
581=over 3
582
583=item MCE::Loop->run ( sub { code }, list )
584
585=item mce_loop { code } list
586
587=back
588
589Input data may be defined using a list, an array ref, or a hash ref.
590
591 # $_ contains the item when chunk_size => 1
592
593 mce_loop { do_work($_) } 1..1000;
594 mce_loop { do_work($_) } \@list;
595
596 # Important; pass an array_ref for deeply input data
597
598 mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ];
599 mce_loop { do_work($_) } \@deeply_list;
600
601 # Chunking; any chunk_size => 1 or greater
602
603 my %res = mce_loop {
604    my ($mce, $chunk_ref, $chunk_id) = @_;
605    my %ret;
606    for my $item (@{ $chunk_ref }) {
607       $ret{$item} = $item * 2;
608    }
609    MCE->gather(%ret);
610 }
611 \@list;
612
613 # Input hash; current API available since 1.828
614
615 my %res = mce_loop {
616    my ($mce, $chunk_ref, $chunk_id) = @_;
617    my %ret;
618    for my $key (keys %{ $chunk_ref }) {
619       $ret{$key} = $chunk_ref->{$key} * 2;
620    }
621    MCE->gather(%ret);
622 }
623 \%hash;
624
625=over 3
626
627=item MCE::Loop->run_file ( sub { code }, file )
628
629=item mce_loop_f { code } file
630
631=back
632
633The fastest of these is the /path/to/file. Workers communicate the next offset
634position among themselves with zero interaction by the manager process.
635
636C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
637
638 # $_ contains the line when chunk_size => 1
639
640 mce_loop_f { $_ } "/path/to/file";  # faster
641 mce_loop_f { $_ } $file_handle;
642 mce_loop_f { $_ } $io;              # IO::All
643 mce_loop_f { $_ } \$scalar;
644
645 # chunking, any chunk_size => 1 or greater
646
647 my %res = mce_loop_f {
648    my ($mce, $chunk_ref, $chunk_id) = @_;
649    my $buf = '';
650    for my $line (@{ $chunk_ref }) {
651       $buf .= $line;
652    }
653    MCE->gather($chunk_id, $buf);
654 }
655 "/path/to/file";
656
657=over 3
658
659=item MCE::Loop->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
660
661=item mce_loop_s { code } $beg, $end [, $step, $fmt ]
662
663=back
664
665Sequence may be defined as a list, an array reference, or a hash reference.
666The functions require both begin and end values to run. Step and format are
667optional. The format is passed to sprintf (% may be omitted below).
668
669 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
670
671 # $_ contains the sequence number when chunk_size => 1
672
673 mce_loop_s { $_ } $beg, $end, $step, $fmt;
674 mce_loop_s { $_ } [ $beg, $end, $step, $fmt ];
675
676 mce_loop_s { $_ } {
677    begin => $beg, end => $end,
678    step => $step, format => $fmt
679 };
680
681 # chunking, any chunk_size => 1 or greater
682
683 my %res = mce_loop_s {
684    my ($mce, $chunk_ref, $chunk_id) = @_;
685    my $buf = '';
686    for my $seq (@{ $chunk_ref }) {
687       $buf .= "$seq\n";
688    }
689    MCE->gather($chunk_id, $buf);
690 }
691 [ $beg, $end ];
692
693The sequence engine can compute 'begin' and 'end' items only, for the chunk,
694and not the items in between (hence boundaries only). This option applies
695to sequence only and has no effect when chunk_size equals 1.
696
697The time to run is 0.006s below. This becomes 0.827s without the bounds_only
698option due to computing all items in between, thus creating a very large
699array. Basically, specify bounds_only => 1 when boundaries is all you need
700for looping inside the block; e.g. Monte Carlo simulations.
701
702Time was measured using 1 worker to emphasize the difference.
703
704 use MCE::Loop;
705
706 MCE::Loop->init(
707    max_workers => 1, chunk_size => 1_250_000,
708    bounds_only => 1
709 );
710
711 # Typically, the input scalar $_ contains the sequence number
712 # when chunk_size => 1, unless the bounds_only option is set
713 # which is the case here. Thus, $_ points to $chunk_ref.
714
715 mce_loop_s {
716    my ($mce, $chunk_ref, $chunk_id) = @_;
717
718    # $chunk_ref contains 2 items, not 1_250_000
719    # my ( $begin, $end ) = ( $_->[0], $_->[1] );
720
721    my $begin = $chunk_ref->[0];
722    my $end   = $chunk_ref->[1];
723
724    # for my $seq ( $begin .. $end ) {
725    #    ...
726    # }
727
728    MCE->printf("%7d .. %8d\n", $begin, $end);
729 }
730 [ 1, 10_000_000 ];
731
732 -- Output
733
734       1 ..  1250000
735 1250001 ..  2500000
736 2500001 ..  3750000
737 3750001 ..  5000000
738 5000001 ..  6250000
739 6250001 ..  7500000
740 7500001 ..  8750000
741 8750001 .. 10000000
742
743=over 3
744
745=item MCE::Loop->run ( sub { code }, iterator )
746
747=item mce_loop { code } iterator
748
749=back
750
751An iterator reference may be specified for input_data. Iterators are described
752under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
753
754 mce_loop { $_ } make_iterator(10, 30, 2);
755
756=head1 GATHERING DATA
757
758Unlike MCE::Map where gather and output order are done for you automatically,
759the gather method is used to have results sent back to the manager process.
760
761 use MCE::Loop chunk_size => 1;
762
763 ## Output order is not guaranteed.
764 my @a1 = mce_loop { MCE->gather($_ * 2) } 1..100;
765 print "@a1\n\n";
766
767 ## Outputs to a hash instead (key, value).
768 my %h1 = mce_loop { MCE->gather($_, $_ * 2) } 1..100;
769 print "@h1{1..100}\n\n";
770
771 ## This does the same thing due to chunk_id starting at one.
772 my %h2 = mce_loop { MCE->gather(MCE->chunk_id, $_ * 2) } 1..100;
773 print "@h2{1..100}\n\n";
774
775The gather method may be called multiple times within the block unlike return
776which would leave the block. Therefore, think of gather as yielding results
777immediately to the manager process without actually leaving the block.
778
779 use MCE::Loop chunk_size => 1, max_workers => 3;
780
781 my @hosts = qw(
782    hosta hostb hostc hostd hoste
783 );
784
785 my %h3 = mce_loop {
786    my ($output, $error, $status); my $host = $_;
787
788    ## Do something with $host;
789    $output = "Worker ". MCE->wid .": Hello from $host";
790
791    if (MCE->chunk_id % 3 == 0) {
792       ## Simulating an error condition
793       local $? = 1; $status = $?;
794       $error = "Error from $host"
795    }
796    else {
797       $status = 0;
798    }
799
800    ## Ensure unique keys (key, value) when gathering to
801    ## a hash.
802    MCE->gather("$host.out", $output);
803    MCE->gather("$host.err", $error) if (defined $error);
804    MCE->gather("$host.sta", $status);
805
806 } @hosts;
807
808 foreach my $host (@hosts) {
809    print $h3{"$host.out"}, "\n";
810    print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
811    print "Exit status: ", $h3{"$host.sta"}, "\n\n";
812 }
813
814 -- Output
815
816 Worker 2: Hello from hosta
817 Exit status: 0
818
819 Worker 1: Hello from hostb
820 Exit status: 0
821
822 Worker 3: Hello from hostc
823 Error from hostc
824 Exit status: 1
825
826 Worker 2: Hello from hostd
827 Exit status: 0
828
829 Worker 1: Hello from hoste
830 Exit status: 0
831
832The following uses an anonymous array containing 3 elements when gathering
833data. Serialization is automatic behind the scene.
834
835 my %h3 = mce_loop {
836    ...
837
838    MCE->gather($host, [$output, $error, $status]);
839
840 } @hosts;
841
842 foreach my $host (@hosts) {
843    print $h3{$host}->[0], "\n";
844    print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
845    print "Exit status: ", $h3{$host}->[2], "\n\n";
846 }
847
848Although MCE::Map comes to mind, one may want additional control when
849gathering data such as retaining output order.
850
851 use MCE::Loop;
852
853 sub preserve_order {
854    my %tmp; my $order_id = 1; my $gather_ref = $_[0];
855
856    return sub {
857       $tmp{ (shift) } = \@_;
858
859       while (1) {
860          last unless exists $tmp{$order_id};
861          push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
862       }
863
864       return;
865    };
866 }
867
868 my @m2;
869
870 MCE::Loop->init(
871    chunk_size => 'auto', max_workers => 'auto',
872    gather => preserve_order(\@m2)
873 );
874
875 mce_loop {
876    my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
877
878    ## Compute the entire chunk data at once.
879    push @a, map { $_ * 2 } @{ $chunk_ref };
880
881    ## Afterwards, invoke the gather feature, which
882    ## will direct the data to the callback function.
883    MCE->gather(MCE->chunk_id, @a);
884
885 } 1..100000;
886
887 MCE::Loop->finish;
888
889 print scalar @m2, "\n";
890
891All 6 models support 'auto' for chunk_size unlike the Core API. Think of the
892models as the basis for providing JIT for MCE. They create the instance, tune
893max_workers, and tune chunk_size automatically regardless of the hardware.
894
895The following does the same thing using the Core API.
896
897 use MCE;
898
899 sub preserve_order {
900    ...
901 }
902
903 my $mce = MCE->new(
904    max_workers => 'auto', chunk_size => 8000,
905
906    user_func => sub {
907       my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
908
909       ## Compute the entire chunk data at once.
910       push @a, map { $_ * 2 } @{ $chunk_ref };
911
912       ## Afterwards, invoke the gather feature, which
913       ## will direct the data to the callback function.
914       MCE->gather(MCE->chunk_id, @a);
915    }
916 );
917
918 my @m2;
919
920 $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
921 $mce->shutdown;
922
923 print scalar @m2, "\n";
924
925=head1 MANUAL SHUTDOWN
926
927=over 3
928
929=item MCE::Loop->finish
930
931=item MCE::Loop::finish
932
933=back
934
935Workers remain persistent as much as possible after running. Shutdown occurs
936automatically when the script terminates. Call finish when workers are no
937longer needed.
938
939 use MCE::Loop;
940
941 MCE::Loop->init(
942    chunk_size => 20, max_workers => 'auto'
943 );
944
945 mce_loop { ... } 1..100;
946
947 MCE::Loop->finish;
948
949=head1 INDEX
950
951L<MCE|MCE>, L<MCE::Core>
952
953=head1 AUTHOR
954
955Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
956
957=cut
958
959