1###############################################################################
2## ----------------------------------------------------------------------------
3## Parallel map model similar to the native map function.
4##
5###############################################################################
6
7package MCE::Map;
8
9use strict;
10use warnings;
11
12no warnings qw( threads recursion uninitialized );
13
14our $VERSION = '1.876';
15
16## no critic (BuiltinFunctions::ProhibitStringyEval)
17## no critic (Subroutines::ProhibitSubroutinePrototypes)
18## no critic (TestingAndDebugging::ProhibitNoStrict)
19
20use Scalar::Util qw( looks_like_number weaken );
21use MCE;
22
23our @CARP_NOT = qw( MCE );
24
25my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26
27sub CLONE {
28   $_tid = threads->tid() if $INC{'threads.pm'};
29}
30
31###############################################################################
32## ----------------------------------------------------------------------------
33## Import routine.
34##
35###############################################################################
36
37my ($_MCE, $_def, $_params, $_prev_c, $_tag) = ({}, {}, {}, {}, 'MCE::Map');
38
39sub import {
40   my ($_class, $_pkg) = (shift, caller);
41
42   my $_p = $_def->{$_pkg} = {
43      MAX_WORKERS => 'auto',
44      CHUNK_SIZE  => 'auto',
45   };
46
47   ## Import functions.
48   no strict 'refs'; no warnings 'redefine';
49
50   *{ $_pkg.'::mce_map_f' } = \&run_file;
51   *{ $_pkg.'::mce_map_s' } = \&run_seq;
52   *{ $_pkg.'::mce_map'   } = \&run;
53
54   ## Process module arguments.
55   while ( my $_argument = shift ) {
56      my $_arg = lc $_argument;
57
58      $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
59      $_p->{CHUNK_SIZE}  = shift, next if ( $_arg eq 'chunk_size' );
60      $_p->{TMP_DIR}     = shift, next if ( $_arg eq 'tmp_dir' );
61      $_p->{FREEZE}      = shift, next if ( $_arg eq 'freeze' );
62      $_p->{THAW}        = shift, next if ( $_arg eq 'thaw' );
63
64      ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
65      if ( $_arg eq 'sereal' ) {
66         if ( shift eq '0' ) {
67            require Storable;
68            $_p->{FREEZE} = \&Storable::freeze;
69            $_p->{THAW}   = \&Storable::thaw;
70         }
71         next;
72      }
73
74      _croak("Error: ($_argument) invalid module option");
75   }
76
77   $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
78
79   MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
80   MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
81      unless ($_p->{CHUNK_SIZE} eq 'auto');
82
83   return;
84}
85
86###############################################################################
87## ----------------------------------------------------------------------------
88## Gather callback for storing by chunk_id => chunk_ref into a hash.
89##
90###############################################################################
91
92my ($_total_chunks, %_tmp);
93
94sub _gather {
95
96   my ($_chunk_id, $_data_ref) = @_;
97
98   $_tmp{$_chunk_id} = $_data_ref;
99   $_total_chunks++;
100
101   return;
102}
103
104###############################################################################
105## ----------------------------------------------------------------------------
106## Init and finish routines.
107##
108###############################################################################
109
110sub init (@) {
111
112   shift if (defined $_[0] && $_[0] eq 'MCE::Map');
113   my $_pkg = "$$.$_tid.".caller();
114
115   $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
116
117   _croak("$_tag: (HASH) not allowed as input by this MCE model")
118      if ( ref $_params->{$_pkg}{input_data} eq 'HASH' );
119
120   @_ = ();
121
122   return;
123}
124
125sub finish (@) {
126
127   shift if (defined $_[0] && $_[0] eq 'MCE::Map');
128   my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
129
130   if ( $_pkg eq 'MCE' ) {
131      for my $_k ( keys %{ $_MCE } ) { MCE::Map->finish($_k, 1); }
132   }
133   elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
134      $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
135      $_total_chunks = undef, undef %_tmp;
136
137      delete $_prev_c->{$_pkg};
138      delete $_MCE->{$_pkg};
139   }
140
141   @_ = ();
142
143   return;
144}
145
146###############################################################################
147## ----------------------------------------------------------------------------
148## Parallel map with MCE -- file.
149##
150###############################################################################
151
152sub run_file (&@) {
153
154   shift if (defined $_[0] && $_[0] eq 'MCE::Map');
155
156   my $_code = shift; my $_file = shift;
157   my $_pid  = "$$.$_tid.".caller();
158
159   if (defined (my $_p = $_params->{$_pid})) {
160      delete $_p->{input_data} if (exists $_p->{input_data});
161      delete $_p->{sequence}   if (exists $_p->{sequence});
162   }
163   else {
164      $_params->{$_pid} = {};
165   }
166
167   if (defined $_file && ref $_file eq '' && $_file ne '') {
168      _croak("$_tag: ($_file) does not exist")      unless (-e $_file);
169      _croak("$_tag: ($_file) is not readable")     unless (-r $_file);
170      _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
171      $_params->{$_pid}{_file} = $_file;
172   }
173   elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
174      $_params->{$_pid}{_file} = $_file;
175   }
176   else {
177      _croak("$_tag: (file) is not specified or valid");
178   }
179
180   @_ = ();
181
182   return run($_code);
183}
184
185###############################################################################
186## ----------------------------------------------------------------------------
187## Parallel map with MCE -- sequence.
188##
189###############################################################################
190
191sub run_seq (&@) {
192
193   shift if (defined $_[0] && $_[0] eq 'MCE::Map');
194
195   my $_code = shift;
196   my $_pid  = "$$.$_tid.".caller();
197
198   if (defined (my $_p = $_params->{$_pid})) {
199      delete $_p->{input_data} if (exists $_p->{input_data});
200      delete $_p->{_file}      if (exists $_p->{_file});
201   }
202   else {
203      $_params->{$_pid} = {};
204   }
205
206   my ($_begin, $_end);
207
208   if (ref $_[0] eq 'HASH') {
209      $_begin = $_[0]->{begin}; $_end = $_[0]->{end};
210      $_params->{$_pid}{sequence} = $_[0];
211   }
212   elsif (ref $_[0] eq 'ARRAY') {
213      $_begin = $_[0]->[0]; $_end = $_[0]->[1];
214      $_params->{$_pid}{sequence} = $_[0];
215   }
216   elsif (ref $_[0] eq '' || ref($_[0]) =~ /^Math::/) {
217      $_begin = $_[0]; $_end = $_[1];
218      $_params->{$_pid}{sequence} = [ @_ ];
219   }
220   else {
221      _croak("$_tag: (sequence) is not specified or valid");
222   }
223
224   _croak("$_tag: (begin) is not specified for sequence")
225      unless (defined $_begin);
226   _croak("$_tag: (end) is not specified for sequence")
227      unless (defined $_end);
228
229   $_params->{$_pid}{sequence_run} = undef;
230
231   @_ = ();
232
233   return run($_code);
234}
235
236###############################################################################
237## ----------------------------------------------------------------------------
238## Parallel map with MCE.
239##
240###############################################################################
241
242sub run (&@) {
243
244   shift if (defined $_[0] && $_[0] eq 'MCE::Map');
245
246   my $_code = shift;  $_total_chunks = 0; undef %_tmp;
247   my $_pkg  = caller() eq 'MCE::Map' ? caller(1) : caller();
248   my $_pid  = "$$.$_tid.$_pkg";
249
250   my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
251   my $_r = ref $_[0];
252
253   if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|CODE|GLOB|FileHandle|IO::)/) {
254      _croak("$_tag: (HASH) not allowed as input by this MCE model")
255         if $_r eq 'HASH';
256      $_input_data = shift;
257   }
258
259   if (defined (my $_p = $_params->{$_pid})) {
260      $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
261         if (exists $_p->{max_workers});
262
263      delete $_p->{sequence}    if (defined $_input_data || scalar @_);
264      delete $_p->{user_func}   if (exists $_p->{user_func});
265      delete $_p->{user_tasks}  if (exists $_p->{user_tasks});
266      delete $_p->{use_slurpio} if (exists $_p->{use_slurpio});
267      delete $_p->{bounds_only} if (exists $_p->{bounds_only});
268      delete $_p->{gather}      if (exists $_p->{gather});
269   }
270
271   my $_chunk_size = MCE::_parse_chunk_size(
272      $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
273      $_input_data, scalar @_
274   );
275
276   if (defined (my $_p = $_params->{$_pid})) {
277      if (exists $_p->{_file}) {
278         $_input_data = delete $_p->{_file};
279      } else {
280         $_input_data = $_p->{input_data} if exists $_p->{input_data};
281      }
282   }
283
284   ## -------------------------------------------------------------------------
285
286   MCE::_save_state($_MCE->{$_pid});
287
288   if (!defined $_prev_c->{$_pid} || $_prev_c->{$_pid} != $_code) {
289      $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
290      $_prev_c->{$_pid} = $_code;
291
292      my %_opts = (
293         max_workers => $_max_workers, task_name => $_tag,
294         user_func => sub {
295
296            my ($_mce, $_chunk_ref, $_chunk_id) = @_;
297            my $_wantarray = $_mce->{user_args}[0];
298
299            if ($_wantarray) {
300               my @_a;
301
302               if (ref $_chunk_ref eq 'SCALAR') {
303                  local $/ = $_mce->{RS} if defined $_mce->{RS};
304                  open my $_MEM_FH, '<', $_chunk_ref;
305                  binmode $_MEM_FH, ':raw';
306                  while (<$_MEM_FH>) { push @_a, &{ $_code }; }
307                  close   $_MEM_FH;
308                  weaken  $_MEM_FH;
309               }
310               else {
311                  if (ref $_chunk_ref) {
312                     push @_a, map { &{ $_code } } @{ $_chunk_ref };
313                  } else {
314                     push @_a, map { &{ $_code } } $_chunk_ref;
315                  }
316               }
317
318               MCE->gather($_chunk_id, \@_a);
319            }
320            else {
321               my $_cnt = 0;
322
323               if (ref $_chunk_ref eq 'SCALAR') {
324                  local $/ = $_mce->{RS} if defined $_mce->{RS};
325                  open my $_MEM_FH, '<', $_chunk_ref;
326                  binmode $_MEM_FH, ':raw';
327                  while (<$_MEM_FH>) { $_cnt++; &{ $_code }; }
328                  close   $_MEM_FH;
329                  weaken  $_MEM_FH;
330               }
331               else {
332                  if (ref $_chunk_ref) {
333                     $_cnt += map { &{ $_code } } @{ $_chunk_ref };
334                  } else {
335                     $_cnt += map { &{ $_code } } $_chunk_ref;
336                  }
337               }
338
339               MCE->gather($_cnt) if defined $_wantarray;
340            }
341         },
342      );
343
344      if (defined (my $_p = $_params->{$_pid})) {
345         for my $_k (keys %{ $_p }) {
346            next if ($_k eq 'sequence_run');
347            next if ($_k eq 'input_data');
348            next if ($_k eq 'chunk_size');
349
350            _croak("$_tag: ($_k) is not a valid constructor argument")
351               unless (exists $MCE::_valid_fields_new{$_k});
352
353            $_opts{$_k} = $_p->{$_k};
354         }
355      }
356
357      for my $_k (qw/ tmp_dir freeze thaw /) {
358         $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
359            if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
360      }
361
362      $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
363   }
364
365   ## -------------------------------------------------------------------------
366
367   my $_cnt = 0; my $_wantarray = wantarray;
368
369   $_MCE->{$_pid}{use_slurpio} = ($_chunk_size > &MCE::MAX_RECS_SIZE) ? 1 : 0;
370   $_MCE->{$_pid}{user_args}   = [ $_wantarray ];
371
372   $_MCE->{$_pid}{gather} = $_wantarray
373      ? \&_gather : sub { $_cnt += $_[0]; return; };
374
375   if (defined $_input_data) {
376      @_ = ();
377      $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
378      delete $_MCE->{$_pid}{input_data};
379   }
380   elsif (scalar @_) {
381      $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
382      delete $_MCE->{$_pid}{input_data};
383   }
384   else {
385      if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
386         $_MCE->{$_pid}->run({
387             chunk_size => $_chunk_size,
388             sequence   => $_params->{$_pid}{sequence}
389         }, 0);
390         if (exists $_params->{$_pid}{sequence_run}) {
391             delete $_params->{$_pid}{sequence_run};
392             delete $_params->{$_pid}{sequence};
393         }
394         delete $_MCE->{$_pid}{sequence};
395      }
396   }
397
398   MCE::_restore_state();
399
400   if ($_wantarray) {
401      return map { @{ $_ } } delete @_tmp{ 1 .. $_total_chunks };
402   }
403   elsif (defined $_wantarray) {
404      return $_cnt;
405   }
406
407   return;
408}
409
410###############################################################################
411## ----------------------------------------------------------------------------
412## Private methods.
413##
414###############################################################################
415
416sub _croak {
417
418   goto &MCE::_croak;
419}
420
4211;
422
423__END__
424
425###############################################################################
426## ----------------------------------------------------------------------------
427## Module usage.
428##
429###############################################################################
430
431=head1 NAME
432
433MCE::Map - Parallel map model similar to the native map function
434
435=head1 VERSION
436
437This document describes MCE::Map version 1.876
438
439=head1 SYNOPSIS
440
441 ## Exports mce_map, mce_map_f, and mce_map_s
442 use MCE::Map;
443
444 ## Array or array_ref
445 my @a = mce_map { $_ * $_ } 1..10000;
446 my @b = mce_map { $_ * $_ } \@list;
447
448 ## Important; pass an array_ref for deeply input data
449 my @c = mce_map { $_->[1] *= 2; $_ } [ [ 0, 1 ], [ 0, 2 ], ... ];
450 my @d = mce_map { $_->[1] *= 2; $_ } \@deeply_list;
451
452 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
453 ## Workers read directly and not involve the manager process
454 my @e = mce_map_f { chomp; $_ } "/path/to/file"; # efficient
455
456 ## Involves the manager process, therefore slower
457 my @f = mce_map_f { chomp; $_ } $file_handle;
458 my @g = mce_map_f { chomp; $_ } $io;
459 my @h = mce_map_f { chomp; $_ } \$scalar;
460
461 ## Sequence of numbers (begin, end [, step, format])
462 my @i = mce_map_s { $_ * $_ } 1, 10000, 5;
463 my @j = mce_map_s { $_ * $_ } [ 1, 10000, 5 ];
464
465 my @k = mce_map_s { $_ * $_ } {
466    begin => 1, end => 10000, step => 5, format => undef
467 };
468
469=head1 DESCRIPTION
470
471This module provides a parallel map implementation via Many-Core Engine.
472MCE incurs a small overhead due to passing of data. A fast code block will
473run faster natively. However, the overhead will likely diminish as the
474complexity increases for the code.
475
476 my @m1 =     map { $_ * $_ } 1..1000000;               ## 0.127 secs
477 my @m2 = mce_map { $_ * $_ } 1..1000000;               ## 0.304 secs
478
479Chunking, enabled by default, greatly reduces the overhead behind the scene.
480The time for mce_map below also includes the time for data exchanges between
481the manager and worker processes. More parallelization will be seen when the
482code incurs additional CPU time.
483
484 sub calc {
485    sqrt $_ * sqrt $_ / 1.3 * 1.5 / 3.2 * 1.07
486 }
487
488 my @m1 =     map { calc } 1..1000000;                  ## 0.367 secs
489 my @m2 = mce_map { calc } 1..1000000;                  ## 0.365 secs
490
491Even faster is mce_map_s; useful when input data is a range of numbers.
492Workers generate sequences mathematically among themselves without any
493interaction from the manager process. Two arguments are required for
494mce_map_s (begin, end). Step defaults to 1 if begin is smaller than end,
495otherwise -1.
496
497 my @m3 = mce_map_s { calc } 1, 1000000;                ## 0.270 secs
498
499Although this document is about MCE::Map, the L<MCE::Stream> module can write
500results immediately without waiting for all chunks to complete. This is made
501possible by passing the reference to an array (in this case @m4 and @m5).
502
503 use MCE::Stream;
504
505 sub calc {
506    sqrt $_ * sqrt $_ / 1.3 * 1.5 / 3.2 * 1.07
507 }
508
509 my @m4; mce_stream \@m4, sub { calc }, 1..1000000;
510
511    ## Completes in 0.272 secs. This is amazing considering the
512    ## overhead for passing data between the manager and workers.
513
514 my @m5; mce_stream_s \@m5, sub { calc }, 1, 1000000;
515
516    ## Completed in 0.176 secs. Like with mce_map_s, specifying a
517    ## sequence specification turns out to be faster due to lesser
518    ## overhead for the manager process.
519
520=head1 OVERRIDING DEFAULTS
521
522The following list options which may be overridden when loading the module.
523
524 use Sereal qw( encode_sereal decode_sereal );
525 use CBOR::XS qw( encode_cbor decode_cbor );
526 use JSON::XS qw( encode_json decode_json );
527
528 use MCE::Map
529     max_workers => 4,                # Default 'auto'
530     chunk_size => 100,               # Default 'auto'
531     tmp_dir => "/path/to/app/tmp",   # $MCE::Signal::tmp_dir
532     freeze => \&encode_sereal,       # \&Storable::freeze
533     thaw => \&decode_sereal          # \&Storable::thaw
534 ;
535
536From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
537Specify C<< Sereal => 0 >> to use Storable instead.
538
539 use MCE::Map Sereal => 0;
540
541=head1 CUSTOMIZING MCE
542
543=over 3
544
545=item MCE::Map->init ( options )
546
547=item MCE::Map::init { options }
548
549=back
550
551The init function accepts a hash of MCE options. The gather option, if
552specified, is ignored due to being used internally by the module.
553
554 use MCE::Map;
555
556 MCE::Map->init(
557    chunk_size => 1, max_workers => 4,
558
559    user_begin => sub {
560       print "## ", MCE->wid, " started\n";
561    },
562
563    user_end => sub {
564       print "## ", MCE->wid, " completed\n";
565    }
566 );
567
568 my @a = mce_map { $_ * $_ } 1..100;
569
570 print "\n", "@a", "\n";
571
572 -- Output
573
574 ## 2 started
575 ## 1 started
576 ## 3 started
577 ## 4 started
578 ## 1 completed
579 ## 4 completed
580 ## 2 completed
581 ## 3 completed
582
583 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
584 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
585 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
586 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
587 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
588 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
589 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
590 10000
591
592=head1 API DOCUMENTATION
593
594=over 3
595
596=item MCE::Map->run ( sub { code }, list )
597
598=item mce_map { code } list
599
600=back
601
602Input data may be defined using a list or an array reference. Unlike MCE::Loop,
603Flow, and Step, specifying a hash reference as input data isn't allowed.
604
605 ## Array or array_ref
606 my @a = mce_map { $_ * 2 } 1..1000;
607 my @b = mce_map { $_ * 2 } \@list;
608
609 ## Important; pass an array_ref for deeply input data
610 my @c = mce_map { $_->[1] *= 2; $_ } [ [ 0, 1 ], [ 0, 2 ], ... ];
611 my @d = mce_map { $_->[1] *= 2; $_ } \@deeply_list;
612
613 ## Not supported
614 my @z = mce_map { ... } \%hash;
615
616=over 3
617
618=item MCE::Map->run_file ( sub { code }, file )
619
620=item mce_map_f { code } file
621
622=back
623
624The fastest of these is the /path/to/file. Workers communicate the next offset
625position among themselves with zero interaction by the manager process.
626
627C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
628
629 my @c = mce_map_f { chomp; $_ . "\r\n" } "/path/to/file";  # faster
630 my @d = mce_map_f { chomp; $_ . "\r\n" } $file_handle;
631 my @e = mce_map_f { chomp; $_ . "\r\n" } $io;              # IO::All
632 my @f = mce_map_f { chomp; $_ . "\r\n" } \$scalar;
633
634=over 3
635
636=item MCE::Map->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
637
638=item mce_map_s { code } $beg, $end [, $step, $fmt ]
639
640=back
641
642Sequence may be defined as a list, an array reference, or a hash reference.
643The functions require both begin and end values to run. Step and format are
644optional. The format is passed to sprintf (% may be omitted below).
645
646 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
647
648 my @f = mce_map_s { $_ } $beg, $end, $step, $fmt;
649 my @g = mce_map_s { $_ } [ $beg, $end, $step, $fmt ];
650
651 my @h = mce_map_s { $_ } {
652    begin => $beg, end => $end,
653    step => $step, format => $fmt
654 };
655
656=over 3
657
658=item MCE::Map->run ( sub { code }, iterator )
659
660=item mce_map { code } iterator
661
662=back
663
664An iterator reference may be specified for input_data. Iterators are described
665under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
666
667 my @a = mce_map { $_ * 2 } make_iterator(10, 30, 2);
668
669=head1 MANUAL SHUTDOWN
670
671=over 3
672
673=item MCE::Map->finish
674
675=item MCE::Map::finish
676
677=back
678
679Workers remain persistent as much as possible after running. Shutdown occurs
680automatically when the script terminates. Call finish when workers are no
681longer needed.
682
683 use MCE::Map;
684
685 MCE::Map->init(
686    chunk_size => 20, max_workers => 'auto'
687 );
688
689 my @a = mce_map { ... } 1..100;
690
691 MCE::Map->finish;
692
693=head1 INDEX
694
695L<MCE|MCE>, L<MCE::Core>
696
697=head1 AUTHOR
698
699Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
700
701=cut
702
703