1############################################################################### 2## ---------------------------------------------------------------------------- 3## Parallel map model similar to the native map function. 4## 5############################################################################### 6 7package MCE::Map; 8 9use strict; 10use warnings; 11 12no warnings qw( threads recursion uninitialized ); 13 14our $VERSION = '1.876'; 15 16## no critic (BuiltinFunctions::ProhibitStringyEval) 17## no critic (Subroutines::ProhibitSubroutinePrototypes) 18## no critic (TestingAndDebugging::ProhibitNoStrict) 19 20use Scalar::Util qw( looks_like_number weaken ); 21use MCE; 22 23our @CARP_NOT = qw( MCE ); 24 25my $_tid = $INC{'threads.pm'} ? threads->tid() : 0; 26 27sub CLONE { 28 $_tid = threads->tid() if $INC{'threads.pm'}; 29} 30 31############################################################################### 32## ---------------------------------------------------------------------------- 33## Import routine. 34## 35############################################################################### 36 37my ($_MCE, $_def, $_params, $_prev_c, $_tag) = ({}, {}, {}, {}, 'MCE::Map'); 38 39sub import { 40 my ($_class, $_pkg) = (shift, caller); 41 42 my $_p = $_def->{$_pkg} = { 43 MAX_WORKERS => 'auto', 44 CHUNK_SIZE => 'auto', 45 }; 46 47 ## Import functions. 48 no strict 'refs'; no warnings 'redefine'; 49 50 *{ $_pkg.'::mce_map_f' } = \&run_file; 51 *{ $_pkg.'::mce_map_s' } = \&run_seq; 52 *{ $_pkg.'::mce_map' } = \&run; 53 54 ## Process module arguments. 55 while ( my $_argument = shift ) { 56 my $_arg = lc $_argument; 57 58 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' ); 59 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' ); 60 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' ); 61 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' ); 62 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' ); 63 64 ## Sereal 3.015+, if available, is used automatically by MCE 1.8+. 65 if ( $_arg eq 'sereal' ) { 66 if ( shift eq '0' ) { 67 require Storable; 68 $_p->{FREEZE} = \&Storable::freeze; 69 $_p->{THAW} = \&Storable::thaw; 70 } 71 next; 72 } 73 74 _croak("Error: ($_argument) invalid module option"); 75 } 76 77 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS}); 78 79 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag); 80 MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag) 81 unless ($_p->{CHUNK_SIZE} eq 'auto'); 82 83 return; 84} 85 86############################################################################### 87## ---------------------------------------------------------------------------- 88## Gather callback for storing by chunk_id => chunk_ref into a hash. 89## 90############################################################################### 91 92my ($_total_chunks, %_tmp); 93 94sub _gather { 95 96 my ($_chunk_id, $_data_ref) = @_; 97 98 $_tmp{$_chunk_id} = $_data_ref; 99 $_total_chunks++; 100 101 return; 102} 103 104############################################################################### 105## ---------------------------------------------------------------------------- 106## Init and finish routines. 107## 108############################################################################### 109 110sub init (@) { 111 112 shift if (defined $_[0] && $_[0] eq 'MCE::Map'); 113 my $_pkg = "$$.$_tid.".caller(); 114 115 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ }; 116 117 _croak("$_tag: (HASH) not allowed as input by this MCE model") 118 if ( ref $_params->{$_pkg}{input_data} eq 'HASH' ); 119 120 @_ = (); 121 122 return; 123} 124 125sub finish (@) { 126 127 shift if (defined $_[0] && $_[0] eq 'MCE::Map'); 128 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller(); 129 130 if ( $_pkg eq 'MCE' ) { 131 for my $_k ( keys %{ $_MCE } ) { MCE::Map->finish($_k, 1); } 132 } 133 elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) { 134 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned}; 135 $_total_chunks = undef, undef %_tmp; 136 137 delete $_prev_c->{$_pkg}; 138 delete $_MCE->{$_pkg}; 139 } 140 141 @_ = (); 142 143 return; 144} 145 146############################################################################### 147## ---------------------------------------------------------------------------- 148## Parallel map with MCE -- file. 149## 150############################################################################### 151 152sub run_file (&@) { 153 154 shift if (defined $_[0] && $_[0] eq 'MCE::Map'); 155 156 my $_code = shift; my $_file = shift; 157 my $_pid = "$$.$_tid.".caller(); 158 159 if (defined (my $_p = $_params->{$_pid})) { 160 delete $_p->{input_data} if (exists $_p->{input_data}); 161 delete $_p->{sequence} if (exists $_p->{sequence}); 162 } 163 else { 164 $_params->{$_pid} = {}; 165 } 166 167 if (defined $_file && ref $_file eq '' && $_file ne '') { 168 _croak("$_tag: ($_file) does not exist") unless (-e $_file); 169 _croak("$_tag: ($_file) is not readable") unless (-r $_file); 170 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file); 171 $_params->{$_pid}{_file} = $_file; 172 } 173 elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) { 174 $_params->{$_pid}{_file} = $_file; 175 } 176 else { 177 _croak("$_tag: (file) is not specified or valid"); 178 } 179 180 @_ = (); 181 182 return run($_code); 183} 184 185############################################################################### 186## ---------------------------------------------------------------------------- 187## Parallel map with MCE -- sequence. 188## 189############################################################################### 190 191sub run_seq (&@) { 192 193 shift if (defined $_[0] && $_[0] eq 'MCE::Map'); 194 195 my $_code = shift; 196 my $_pid = "$$.$_tid.".caller(); 197 198 if (defined (my $_p = $_params->{$_pid})) { 199 delete $_p->{input_data} if (exists $_p->{input_data}); 200 delete $_p->{_file} if (exists $_p->{_file}); 201 } 202 else { 203 $_params->{$_pid} = {}; 204 } 205 206 my ($_begin, $_end); 207 208 if (ref $_[0] eq 'HASH') { 209 $_begin = $_[0]->{begin}; $_end = $_[0]->{end}; 210 $_params->{$_pid}{sequence} = $_[0]; 211 } 212 elsif (ref $_[0] eq 'ARRAY') { 213 $_begin = $_[0]->[0]; $_end = $_[0]->[1]; 214 $_params->{$_pid}{sequence} = $_[0]; 215 } 216 elsif (ref $_[0] eq '' || ref($_[0]) =~ /^Math::/) { 217 $_begin = $_[0]; $_end = $_[1]; 218 $_params->{$_pid}{sequence} = [ @_ ]; 219 } 220 else { 221 _croak("$_tag: (sequence) is not specified or valid"); 222 } 223 224 _croak("$_tag: (begin) is not specified for sequence") 225 unless (defined $_begin); 226 _croak("$_tag: (end) is not specified for sequence") 227 unless (defined $_end); 228 229 $_params->{$_pid}{sequence_run} = undef; 230 231 @_ = (); 232 233 return run($_code); 234} 235 236############################################################################### 237## ---------------------------------------------------------------------------- 238## Parallel map with MCE. 239## 240############################################################################### 241 242sub run (&@) { 243 244 shift if (defined $_[0] && $_[0] eq 'MCE::Map'); 245 246 my $_code = shift; $_total_chunks = 0; undef %_tmp; 247 my $_pkg = caller() eq 'MCE::Map' ? caller(1) : caller(); 248 my $_pid = "$$.$_tid.$_pkg"; 249 250 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS}; 251 my $_r = ref $_[0]; 252 253 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|CODE|GLOB|FileHandle|IO::)/) { 254 _croak("$_tag: (HASH) not allowed as input by this MCE model") 255 if $_r eq 'HASH'; 256 $_input_data = shift; 257 } 258 259 if (defined (my $_p = $_params->{$_pid})) { 260 $_max_workers = MCE::_parse_max_workers($_p->{max_workers}) 261 if (exists $_p->{max_workers}); 262 263 delete $_p->{sequence} if (defined $_input_data || scalar @_); 264 delete $_p->{user_func} if (exists $_p->{user_func}); 265 delete $_p->{user_tasks} if (exists $_p->{user_tasks}); 266 delete $_p->{use_slurpio} if (exists $_p->{use_slurpio}); 267 delete $_p->{bounds_only} if (exists $_p->{bounds_only}); 268 delete $_p->{gather} if (exists $_p->{gather}); 269 } 270 271 my $_chunk_size = MCE::_parse_chunk_size( 272 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid}, 273 $_input_data, scalar @_ 274 ); 275 276 if (defined (my $_p = $_params->{$_pid})) { 277 if (exists $_p->{_file}) { 278 $_input_data = delete $_p->{_file}; 279 } else { 280 $_input_data = $_p->{input_data} if exists $_p->{input_data}; 281 } 282 } 283 284 ## ------------------------------------------------------------------------- 285 286 MCE::_save_state($_MCE->{$_pid}); 287 288 if (!defined $_prev_c->{$_pid} || $_prev_c->{$_pid} != $_code) { 289 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid}); 290 $_prev_c->{$_pid} = $_code; 291 292 my %_opts = ( 293 max_workers => $_max_workers, task_name => $_tag, 294 user_func => sub { 295 296 my ($_mce, $_chunk_ref, $_chunk_id) = @_; 297 my $_wantarray = $_mce->{user_args}[0]; 298 299 if ($_wantarray) { 300 my @_a; 301 302 if (ref $_chunk_ref eq 'SCALAR') { 303 local $/ = $_mce->{RS} if defined $_mce->{RS}; 304 open my $_MEM_FH, '<', $_chunk_ref; 305 binmode $_MEM_FH, ':raw'; 306 while (<$_MEM_FH>) { push @_a, &{ $_code }; } 307 close $_MEM_FH; 308 weaken $_MEM_FH; 309 } 310 else { 311 if (ref $_chunk_ref) { 312 push @_a, map { &{ $_code } } @{ $_chunk_ref }; 313 } else { 314 push @_a, map { &{ $_code } } $_chunk_ref; 315 } 316 } 317 318 MCE->gather($_chunk_id, \@_a); 319 } 320 else { 321 my $_cnt = 0; 322 323 if (ref $_chunk_ref eq 'SCALAR') { 324 local $/ = $_mce->{RS} if defined $_mce->{RS}; 325 open my $_MEM_FH, '<', $_chunk_ref; 326 binmode $_MEM_FH, ':raw'; 327 while (<$_MEM_FH>) { $_cnt++; &{ $_code }; } 328 close $_MEM_FH; 329 weaken $_MEM_FH; 330 } 331 else { 332 if (ref $_chunk_ref) { 333 $_cnt += map { &{ $_code } } @{ $_chunk_ref }; 334 } else { 335 $_cnt += map { &{ $_code } } $_chunk_ref; 336 } 337 } 338 339 MCE->gather($_cnt) if defined $_wantarray; 340 } 341 }, 342 ); 343 344 if (defined (my $_p = $_params->{$_pid})) { 345 for my $_k (keys %{ $_p }) { 346 next if ($_k eq 'sequence_run'); 347 next if ($_k eq 'input_data'); 348 next if ($_k eq 'chunk_size'); 349 350 _croak("$_tag: ($_k) is not a valid constructor argument") 351 unless (exists $MCE::_valid_fields_new{$_k}); 352 353 $_opts{$_k} = $_p->{$_k}; 354 } 355 } 356 357 for my $_k (qw/ tmp_dir freeze thaw /) { 358 $_opts{$_k} = $_def->{$_pkg}{uc($_k)} 359 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k}); 360 } 361 362 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts); 363 } 364 365 ## ------------------------------------------------------------------------- 366 367 my $_cnt = 0; my $_wantarray = wantarray; 368 369 $_MCE->{$_pid}{use_slurpio} = ($_chunk_size > &MCE::MAX_RECS_SIZE) ? 1 : 0; 370 $_MCE->{$_pid}{user_args} = [ $_wantarray ]; 371 372 $_MCE->{$_pid}{gather} = $_wantarray 373 ? \&_gather : sub { $_cnt += $_[0]; return; }; 374 375 if (defined $_input_data) { 376 @_ = (); 377 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data); 378 delete $_MCE->{$_pid}{input_data}; 379 } 380 elsif (scalar @_) { 381 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_); 382 delete $_MCE->{$_pid}{input_data}; 383 } 384 else { 385 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) { 386 $_MCE->{$_pid}->run({ 387 chunk_size => $_chunk_size, 388 sequence => $_params->{$_pid}{sequence} 389 }, 0); 390 if (exists $_params->{$_pid}{sequence_run}) { 391 delete $_params->{$_pid}{sequence_run}; 392 delete $_params->{$_pid}{sequence}; 393 } 394 delete $_MCE->{$_pid}{sequence}; 395 } 396 } 397 398 MCE::_restore_state(); 399 400 if ($_wantarray) { 401 return map { @{ $_ } } delete @_tmp{ 1 .. $_total_chunks }; 402 } 403 elsif (defined $_wantarray) { 404 return $_cnt; 405 } 406 407 return; 408} 409 410############################################################################### 411## ---------------------------------------------------------------------------- 412## Private methods. 413## 414############################################################################### 415 416sub _croak { 417 418 goto &MCE::_croak; 419} 420 4211; 422 423__END__ 424 425############################################################################### 426## ---------------------------------------------------------------------------- 427## Module usage. 428## 429############################################################################### 430 431=head1 NAME 432 433MCE::Map - Parallel map model similar to the native map function 434 435=head1 VERSION 436 437This document describes MCE::Map version 1.876 438 439=head1 SYNOPSIS 440 441 ## Exports mce_map, mce_map_f, and mce_map_s 442 use MCE::Map; 443 444 ## Array or array_ref 445 my @a = mce_map { $_ * $_ } 1..10000; 446 my @b = mce_map { $_ * $_ } \@list; 447 448 ## Important; pass an array_ref for deeply input data 449 my @c = mce_map { $_->[1] *= 2; $_ } [ [ 0, 1 ], [ 0, 2 ], ... ]; 450 my @d = mce_map { $_->[1] *= 2; $_ } \@deeply_list; 451 452 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref 453 ## Workers read directly and not involve the manager process 454 my @e = mce_map_f { chomp; $_ } "/path/to/file"; # efficient 455 456 ## Involves the manager process, therefore slower 457 my @f = mce_map_f { chomp; $_ } $file_handle; 458 my @g = mce_map_f { chomp; $_ } $io; 459 my @h = mce_map_f { chomp; $_ } \$scalar; 460 461 ## Sequence of numbers (begin, end [, step, format]) 462 my @i = mce_map_s { $_ * $_ } 1, 10000, 5; 463 my @j = mce_map_s { $_ * $_ } [ 1, 10000, 5 ]; 464 465 my @k = mce_map_s { $_ * $_ } { 466 begin => 1, end => 10000, step => 5, format => undef 467 }; 468 469=head1 DESCRIPTION 470 471This module provides a parallel map implementation via Many-Core Engine. 472MCE incurs a small overhead due to passing of data. A fast code block will 473run faster natively. However, the overhead will likely diminish as the 474complexity increases for the code. 475 476 my @m1 = map { $_ * $_ } 1..1000000; ## 0.127 secs 477 my @m2 = mce_map { $_ * $_ } 1..1000000; ## 0.304 secs 478 479Chunking, enabled by default, greatly reduces the overhead behind the scene. 480The time for mce_map below also includes the time for data exchanges between 481the manager and worker processes. More parallelization will be seen when the 482code incurs additional CPU time. 483 484 sub calc { 485 sqrt $_ * sqrt $_ / 1.3 * 1.5 / 3.2 * 1.07 486 } 487 488 my @m1 = map { calc } 1..1000000; ## 0.367 secs 489 my @m2 = mce_map { calc } 1..1000000; ## 0.365 secs 490 491Even faster is mce_map_s; useful when input data is a range of numbers. 492Workers generate sequences mathematically among themselves without any 493interaction from the manager process. Two arguments are required for 494mce_map_s (begin, end). Step defaults to 1 if begin is smaller than end, 495otherwise -1. 496 497 my @m3 = mce_map_s { calc } 1, 1000000; ## 0.270 secs 498 499Although this document is about MCE::Map, the L<MCE::Stream> module can write 500results immediately without waiting for all chunks to complete. This is made 501possible by passing the reference to an array (in this case @m4 and @m5). 502 503 use MCE::Stream; 504 505 sub calc { 506 sqrt $_ * sqrt $_ / 1.3 * 1.5 / 3.2 * 1.07 507 } 508 509 my @m4; mce_stream \@m4, sub { calc }, 1..1000000; 510 511 ## Completes in 0.272 secs. This is amazing considering the 512 ## overhead for passing data between the manager and workers. 513 514 my @m5; mce_stream_s \@m5, sub { calc }, 1, 1000000; 515 516 ## Completed in 0.176 secs. Like with mce_map_s, specifying a 517 ## sequence specification turns out to be faster due to lesser 518 ## overhead for the manager process. 519 520=head1 OVERRIDING DEFAULTS 521 522The following list options which may be overridden when loading the module. 523 524 use Sereal qw( encode_sereal decode_sereal ); 525 use CBOR::XS qw( encode_cbor decode_cbor ); 526 use JSON::XS qw( encode_json decode_json ); 527 528 use MCE::Map 529 max_workers => 4, # Default 'auto' 530 chunk_size => 100, # Default 'auto' 531 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir 532 freeze => \&encode_sereal, # \&Storable::freeze 533 thaw => \&decode_sereal # \&Storable::thaw 534 ; 535 536From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available. 537Specify C<< Sereal => 0 >> to use Storable instead. 538 539 use MCE::Map Sereal => 0; 540 541=head1 CUSTOMIZING MCE 542 543=over 3 544 545=item MCE::Map->init ( options ) 546 547=item MCE::Map::init { options } 548 549=back 550 551The init function accepts a hash of MCE options. The gather option, if 552specified, is ignored due to being used internally by the module. 553 554 use MCE::Map; 555 556 MCE::Map->init( 557 chunk_size => 1, max_workers => 4, 558 559 user_begin => sub { 560 print "## ", MCE->wid, " started\n"; 561 }, 562 563 user_end => sub { 564 print "## ", MCE->wid, " completed\n"; 565 } 566 ); 567 568 my @a = mce_map { $_ * $_ } 1..100; 569 570 print "\n", "@a", "\n"; 571 572 -- Output 573 574 ## 2 started 575 ## 1 started 576 ## 3 started 577 ## 4 started 578 ## 1 completed 579 ## 4 completed 580 ## 2 completed 581 ## 3 completed 582 583 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 584 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156 585 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209 586 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600 587 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329 588 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396 589 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801 590 10000 591 592=head1 API DOCUMENTATION 593 594=over 3 595 596=item MCE::Map->run ( sub { code }, list ) 597 598=item mce_map { code } list 599 600=back 601 602Input data may be defined using a list or an array reference. Unlike MCE::Loop, 603Flow, and Step, specifying a hash reference as input data isn't allowed. 604 605 ## Array or array_ref 606 my @a = mce_map { $_ * 2 } 1..1000; 607 my @b = mce_map { $_ * 2 } \@list; 608 609 ## Important; pass an array_ref for deeply input data 610 my @c = mce_map { $_->[1] *= 2; $_ } [ [ 0, 1 ], [ 0, 2 ], ... ]; 611 my @d = mce_map { $_->[1] *= 2; $_ } \@deeply_list; 612 613 ## Not supported 614 my @z = mce_map { ... } \%hash; 615 616=over 3 617 618=item MCE::Map->run_file ( sub { code }, file ) 619 620=item mce_map_f { code } file 621 622=back 623 624The fastest of these is the /path/to/file. Workers communicate the next offset 625position among themselves with zero interaction by the manager process. 626 627C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845. 628 629 my @c = mce_map_f { chomp; $_ . "\r\n" } "/path/to/file"; # faster 630 my @d = mce_map_f { chomp; $_ . "\r\n" } $file_handle; 631 my @e = mce_map_f { chomp; $_ . "\r\n" } $io; # IO::All 632 my @f = mce_map_f { chomp; $_ . "\r\n" } \$scalar; 633 634=over 3 635 636=item MCE::Map->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] ) 637 638=item mce_map_s { code } $beg, $end [, $step, $fmt ] 639 640=back 641 642Sequence may be defined as a list, an array reference, or a hash reference. 643The functions require both begin and end values to run. Step and format are 644optional. The format is passed to sprintf (% may be omitted below). 645 646 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f"); 647 648 my @f = mce_map_s { $_ } $beg, $end, $step, $fmt; 649 my @g = mce_map_s { $_ } [ $beg, $end, $step, $fmt ]; 650 651 my @h = mce_map_s { $_ } { 652 begin => $beg, end => $end, 653 step => $step, format => $fmt 654 }; 655 656=over 3 657 658=item MCE::Map->run ( sub { code }, iterator ) 659 660=item mce_map { code } iterator 661 662=back 663 664An iterator reference may be specified for input_data. Iterators are described 665under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. 666 667 my @a = mce_map { $_ * 2 } make_iterator(10, 30, 2); 668 669=head1 MANUAL SHUTDOWN 670 671=over 3 672 673=item MCE::Map->finish 674 675=item MCE::Map::finish 676 677=back 678 679Workers remain persistent as much as possible after running. Shutdown occurs 680automatically when the script terminates. Call finish when workers are no 681longer needed. 682 683 use MCE::Map; 684 685 MCE::Map->init( 686 chunk_size => 20, max_workers => 'auto' 687 ); 688 689 my @a = mce_map { ... } 1..100; 690 691 MCE::Map->finish; 692 693=head1 INDEX 694 695L<MCE|MCE>, L<MCE::Core> 696 697=head1 AUTHOR 698 699Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> 700 701=cut 702 703