1############################################################################### 2## ---------------------------------------------------------------------------- 3## MCE model for building parallel loops. 4## 5############################################################################### 6 7package MCE::Loop; 8 9use strict; 10use warnings; 11 12no warnings qw( threads recursion uninitialized ); 13 14our $VERSION = '1.876'; 15 16## no critic (BuiltinFunctions::ProhibitStringyEval) 17## no critic (Subroutines::ProhibitSubroutinePrototypes) 18## no critic (TestingAndDebugging::ProhibitNoStrict) 19 20use Scalar::Util qw( looks_like_number ); 21use MCE; 22 23our @CARP_NOT = qw( MCE ); 24 25my $_tid = $INC{'threads.pm'} ? threads->tid() : 0; 26 27sub CLONE { 28 $_tid = threads->tid() if $INC{'threads.pm'}; 29} 30 31############################################################################### 32## ---------------------------------------------------------------------------- 33## Import routine. 34## 35############################################################################### 36 37my ($_MCE, $_def, $_params, $_prev_c, $_tag) = ({}, {}, {}, {}, 'MCE::Loop'); 38 39sub import { 40 my ($_class, $_pkg) = (shift, caller); 41 42 my $_p = $_def->{$_pkg} = { 43 MAX_WORKERS => 'auto', 44 CHUNK_SIZE => 'auto', 45 }; 46 47 ## Import functions. 48 no strict 'refs'; no warnings 'redefine'; 49 50 *{ $_pkg.'::mce_loop_f' } = \&run_file; 51 *{ $_pkg.'::mce_loop_s' } = \&run_seq; 52 *{ $_pkg.'::mce_loop' } = \&run; 53 54 ## Process module arguments. 55 while ( my $_argument = shift ) { 56 my $_arg = lc $_argument; 57 58 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' ); 59 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' ); 60 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' ); 61 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' ); 62 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' ); 63 64 ## Sereal 3.015+, if available, is used automatically by MCE 1.8+. 65 if ( $_arg eq 'sereal' ) { 66 if ( shift eq '0' ) { 67 require Storable; 68 $_p->{FREEZE} = \&Storable::freeze; 69 $_p->{THAW} = \&Storable::thaw; 70 } 71 next; 72 } 73 74 _croak("Error: ($_argument) invalid module option"); 75 } 76 77 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS}); 78 79 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag); 80 MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag) 81 unless ($_p->{CHUNK_SIZE} eq 'auto'); 82 83 return; 84} 85 86############################################################################### 87## ---------------------------------------------------------------------------- 88## Init and finish routines. 89## 90############################################################################### 91 92sub init (@) { 93 94 shift if (defined $_[0] && $_[0] eq 'MCE::Loop'); 95 my $_pkg = "$$.$_tid.".caller(); 96 97 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ }; 98 99 @_ = (); 100 101 return; 102} 103 104sub finish (@) { 105 106 shift if (defined $_[0] && $_[0] eq 'MCE::Loop'); 107 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller(); 108 109 if ( $_pkg eq 'MCE' ) { 110 for my $_k ( keys %{ $_MCE } ) { MCE::Loop->finish($_k, 1); } 111 } 112 elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) { 113 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned}; 114 115 delete $_prev_c->{$_pkg}; 116 delete $_MCE->{$_pkg}; 117 } 118 119 @_ = (); 120 121 return; 122} 123 124############################################################################### 125## ---------------------------------------------------------------------------- 126## Parallel loop with MCE -- file. 127## 128############################################################################### 129 130sub run_file (&@) { 131 132 shift if (defined $_[0] && $_[0] eq 'MCE::Loop'); 133 134 my $_code = shift; my $_file = shift; 135 my $_pid = "$$.$_tid.".caller(); 136 137 if (defined (my $_p = $_params->{$_pid})) { 138 delete $_p->{input_data} if (exists $_p->{input_data}); 139 delete $_p->{sequence} if (exists $_p->{sequence}); 140 } 141 else { 142 $_params->{$_pid} = {}; 143 } 144 145 if (defined $_file && ref $_file eq '' && $_file ne '') { 146 _croak("$_tag: ($_file) does not exist") unless (-e $_file); 147 _croak("$_tag: ($_file) is not readable") unless (-r $_file); 148 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file); 149 $_params->{$_pid}{_file} = $_file; 150 } 151 elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) { 152 $_params->{$_pid}{_file} = $_file; 153 } 154 else { 155 _croak("$_tag: (file) is not specified or valid"); 156 } 157 158 @_ = (); 159 160 return run($_code); 161} 162 163############################################################################### 164## ---------------------------------------------------------------------------- 165## Parallel loop with MCE -- sequence. 166## 167############################################################################### 168 169sub run_seq (&@) { 170 171 shift if (defined $_[0] && $_[0] eq 'MCE::Loop'); 172 173 my $_code = shift; 174 my $_pid = "$$.$_tid.".caller(); 175 176 if (defined (my $_p = $_params->{$_pid})) { 177 delete $_p->{input_data} if (exists $_p->{input_data}); 178 delete $_p->{_file} if (exists $_p->{_file}); 179 } 180 else { 181 $_params->{$_pid} = {}; 182 } 183 184 my ($_begin, $_end); 185 186 if (ref $_[0] eq 'HASH') { 187 $_begin = $_[0]->{begin}; $_end = $_[0]->{end}; 188 $_params->{$_pid}{sequence} = $_[0]; 189 } 190 elsif (ref $_[0] eq 'ARRAY') { 191 $_begin = $_[0]->[0]; $_end = $_[0]->[1]; 192 $_params->{$_pid}{sequence} = $_[0]; 193 } 194 elsif (ref $_[0] eq '' || ref($_[0]) =~ /^Math::/) { 195 $_begin = $_[0]; $_end = $_[1]; 196 $_params->{$_pid}{sequence} = [ @_ ]; 197 } 198 else { 199 _croak("$_tag: (sequence) is not specified or valid"); 200 } 201 202 _croak("$_tag: (begin) is not specified for sequence") 203 unless (defined $_begin); 204 _croak("$_tag: (end) is not specified for sequence") 205 unless (defined $_end); 206 207 $_params->{$_pid}{sequence_run} = undef; 208 209 @_ = (); 210 211 return run($_code); 212} 213 214############################################################################### 215## ---------------------------------------------------------------------------- 216## Parallel loop with MCE. 217## 218############################################################################### 219 220sub run (&@) { 221 222 shift if (defined $_[0] && $_[0] eq 'MCE::Loop'); 223 224 my $_code = shift; 225 my $_pkg = caller() eq 'MCE::Loop' ? caller(1) : caller(); 226 my $_pid = "$$.$_tid.$_pkg"; 227 228 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS}; 229 my $_r = ref $_[0]; 230 231 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|CODE|GLOB|FileHandle|IO::)/) { 232 $_input_data = shift; 233 } 234 235 if (defined (my $_p = $_params->{$_pid})) { 236 $_max_workers = MCE::_parse_max_workers($_p->{max_workers}) 237 if (exists $_p->{max_workers}); 238 239 delete $_p->{sequence} if (defined $_input_data || scalar @_); 240 delete $_p->{user_func} if (exists $_p->{user_func}); 241 delete $_p->{user_tasks} if (exists $_p->{user_tasks}); 242 } 243 244 my $_chunk_size = MCE::_parse_chunk_size( 245 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid}, 246 $_input_data, scalar @_ 247 ); 248 249 if (defined (my $_p = $_params->{$_pid})) { 250 if (exists $_p->{_file}) { 251 $_input_data = delete $_p->{_file}; 252 } else { 253 $_input_data = $_p->{input_data} if exists $_p->{input_data}; 254 } 255 } 256 257 ## ------------------------------------------------------------------------- 258 259 MCE::_save_state($_MCE->{$_pid}); 260 261 if (!defined $_prev_c->{$_pid} || $_prev_c->{$_pid} != $_code) { 262 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid}); 263 $_prev_c->{$_pid} = $_code; 264 265 my %_opts = ( 266 max_workers => $_max_workers, task_name => $_tag, 267 user_func => $_code, 268 ); 269 270 if (defined (my $_p = $_params->{$_pid})) { 271 for my $_k (keys %{ $_p }) { 272 next if ($_k eq 'sequence_run'); 273 next if ($_k eq 'input_data'); 274 next if ($_k eq 'chunk_size'); 275 276 _croak("$_tag: ($_k) is not a valid constructor argument") 277 unless (exists $MCE::_valid_fields_new{$_k}); 278 279 $_opts{$_k} = $_p->{$_k}; 280 } 281 } 282 283 for my $_k (qw/ tmp_dir freeze thaw /) { 284 $_opts{$_k} = $_def->{$_pkg}{uc($_k)} 285 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k}); 286 } 287 288 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts); 289 } 290 291 ## ------------------------------------------------------------------------- 292 293 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa); 294 295 if (defined $_input_data) { 296 @_ = (); 297 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data); 298 delete $_MCE->{$_pid}{input_data}; 299 } 300 elsif (scalar @_) { 301 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_); 302 delete $_MCE->{$_pid}{input_data}; 303 } 304 else { 305 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) { 306 $_MCE->{$_pid}->run({ 307 chunk_size => $_chunk_size, 308 sequence => $_params->{$_pid}{sequence} 309 }, 0); 310 if (exists $_params->{$_pid}{sequence_run}) { 311 delete $_params->{$_pid}{sequence_run}; 312 delete $_params->{$_pid}{sequence}; 313 } 314 delete $_MCE->{$_pid}{sequence}; 315 } 316 } 317 318 MCE::_restore_state(); 319 320 delete $_MCE->{$_pid}{gather} if (defined $_wa); 321 322 return ((defined $_wa) ? @_a : ()); 323} 324 325############################################################################### 326## ---------------------------------------------------------------------------- 327## Private methods. 328## 329############################################################################### 330 331sub _croak { 332 333 goto &MCE::_croak; 334} 335 3361; 337 338__END__ 339 340############################################################################### 341## ---------------------------------------------------------------------------- 342## Module usage. 343## 344############################################################################### 345 346=head1 NAME 347 348MCE::Loop - MCE model for building parallel loops 349 350=head1 VERSION 351 352This document describes MCE::Loop version 1.876 353 354=head1 DESCRIPTION 355 356This module provides a parallel loop implementation through Many-Core Engine. 357MCE::Loop is not MCE::Map but more along the lines of an easy way to spin up a 358MCE instance and have user_func pointing to your code block. If you want 359something similar to map, then see L<MCE::Map>. 360 361 ## Construction when chunking is not desired 362 363 use MCE::Loop; 364 365 MCE::Loop->init( 366 max_workers => 5, chunk_size => 1 367 ); 368 369 mce_loop { 370 my ($mce, $chunk_ref, $chunk_id) = @_; 371 MCE->say("$chunk_id: $_"); 372 } 40 .. 48; 373 374 -- Output 375 376 3: 42 377 1: 40 378 2: 41 379 4: 43 380 5: 44 381 6: 45 382 7: 46 383 8: 47 384 9: 48 385 386 ## Construction for 'auto' or greater than 1 387 388 use MCE::Loop; 389 390 MCE::Loop->init( 391 max_workers => 5, chunk_size => 'auto' 392 ); 393 394 mce_loop { 395 my ($mce, $chunk_ref, $chunk_id) = @_; 396 for (@{ $chunk_ref }) { 397 MCE->say("$chunk_id: $_"); 398 } 399 } 40 .. 48; 400 401 -- Output 402 403 1: 40 404 2: 42 405 1: 41 406 4: 46 407 2: 43 408 5: 48 409 3: 44 410 4: 47 411 3: 45 412 413=head1 SYNOPSIS when CHUNK_SIZE EQUALS 1 414 415All models in MCE default to 'auto' for chunk_size. The arguments for the block 416are the same as writing a user_func block using the Core API. 417 418Beginning with MCE 1.5, the next input item is placed into the input scalar 419variable $_ when chunk_size equals 1. Otherwise, $_ points to $chunk_ref 420containing many items. Basically, line 2 below may be omitted from your code 421when using $_. One can call MCE->chunk_id to obtain the current chunk id. 422 423 line 1: user_func => sub { 424 line 2: my ($mce, $chunk_ref, $chunk_id) = @_; 425 line 3: 426 line 4: $_ points to $chunk_ref->[0] 427 line 5: in MCE 1.5 when chunk_size == 1 428 line 6: 429 line 7: $_ points to $chunk_ref 430 line 8: in MCE 1.5 when chunk_size > 1 431 line 9: } 432 433Follow this synopsis when chunk_size equals one. Looping is not required from 434inside the block. Hence, the block is called once per each item. 435 436 ## Exports mce_loop, mce_loop_f, and mce_loop_s 437 use MCE::Loop; 438 439 MCE::Loop->init( 440 chunk_size => 1 441 ); 442 443 ## Array or array_ref 444 mce_loop { do_work($_) } 1..10000; 445 mce_loop { do_work($_) } \@list; 446 447 ## Important; pass an array_ref for deeply input data 448 mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ]; 449 mce_loop { do_work($_) } \@deeply_list; 450 451 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref 452 ## Workers read directly and not involve the manager process 453 mce_loop_f { chomp; do_work($_) } "/path/to/file"; # efficient 454 455 ## Involves the manager process, therefore slower 456 mce_loop_f { chomp; do_work($_) } $file_handle; 457 mce_loop_f { chomp; do_work($_) } $io; 458 mce_loop_f { chomp; do_work($_) } \$scalar; 459 460 ## Sequence of numbers (begin, end [, step, format]) 461 mce_loop_s { do_work($_) } 1, 10000, 5; 462 mce_loop_s { do_work($_) } [ 1, 10000, 5 ]; 463 464 mce_loop_s { do_work($_) } { 465 begin => 1, end => 10000, step => 5, format => undef 466 }; 467 468=head1 SYNOPSIS when CHUNK_SIZE is GREATER THAN 1 469 470Follow this synopsis when chunk_size equals 'auto' or greater than 1. 471This means having to loop through the chunk from inside the block. 472 473 use MCE::Loop; 474 475 MCE::Loop->init( ## Chunk_size defaults to 'auto' when 476 chunk_size => 'auto' ## not specified. Therefore, the init 477 ); ## function may be omitted. 478 479 ## Syntax is shown for mce_loop for demonstration purposes. 480 ## Looping inside the block is the same for mce_loop_f and 481 ## mce_loop_s. 482 483 ## Array or array_ref 484 mce_loop { do_work($_) for (@{ $_ }) } 1..10000; 485 mce_loop { do_work($_) for (@{ $_ }) } \@list; 486 487 ## Important; pass an array_ref for deeply input data 488 mce_loop { do_work($_) for (@{ $_ }) } [ [ 0, 1 ], [ 0, 2 ], ... ]; 489 mce_loop { do_work($_) for (@{ $_ }) } \@deeply_list; 490 491 ## Resembles code using the core MCE API 492 mce_loop { 493 my ($mce, $chunk_ref, $chunk_id) = @_; 494 495 for (@{ $chunk_ref }) { 496 do_work($_); 497 } 498 499 } 1..10000; 500 501Chunking reduces the number of IPC calls behind the scene. Think in terms of 502chunks whenever processing a large amount of data. For relatively small data, 503choosing 1 for chunk_size is fine. 504 505=head1 OVERRIDING DEFAULTS 506 507The following list options which may be overridden when loading the module. 508 509 use Sereal qw( encode_sereal decode_sereal ); 510 use CBOR::XS qw( encode_cbor decode_cbor ); 511 use JSON::XS qw( encode_json decode_json ); 512 513 use MCE::Loop 514 max_workers => 4, # Default 'auto' 515 chunk_size => 100, # Default 'auto' 516 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir 517 freeze => \&encode_sereal, # \&Storable::freeze 518 thaw => \&decode_sereal # \&Storable::thaw 519 ; 520 521From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available. 522Specify C<< Sereal => 0 >> to use Storable instead. 523 524 use MCE::Loop Sereal => 0; 525 526=head1 CUSTOMIZING MCE 527 528=over 3 529 530=item MCE::Loop->init ( options ) 531 532=item MCE::Loop::init { options } 533 534=back 535 536The init function accepts a hash of MCE options. 537 538 use MCE::Loop; 539 540 MCE::Loop->init( 541 chunk_size => 1, max_workers => 4, 542 543 user_begin => sub { 544 print "## ", MCE->wid, " started\n"; 545 }, 546 547 user_end => sub { 548 print "## ", MCE->wid, " completed\n"; 549 } 550 ); 551 552 my %a = mce_loop { MCE->gather($_, $_ * $_) } 1..100; 553 554 print "\n", "@a{1..100}", "\n"; 555 556 -- Output 557 558 ## 3 started 559 ## 1 started 560 ## 2 started 561 ## 4 started 562 ## 1 completed 563 ## 2 completed 564 ## 3 completed 565 ## 4 completed 566 567 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 568 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156 569 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209 570 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600 571 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329 572 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396 573 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801 574 10000 575 576=head1 API DOCUMENTATION 577 578The following assumes chunk_size equals 1 in order to demonstrate all the 579possibilities for providing input data. 580 581=over 3 582 583=item MCE::Loop->run ( sub { code }, list ) 584 585=item mce_loop { code } list 586 587=back 588 589Input data may be defined using a list, an array ref, or a hash ref. 590 591 # $_ contains the item when chunk_size => 1 592 593 mce_loop { do_work($_) } 1..1000; 594 mce_loop { do_work($_) } \@list; 595 596 # Important; pass an array_ref for deeply input data 597 598 mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ]; 599 mce_loop { do_work($_) } \@deeply_list; 600 601 # Chunking; any chunk_size => 1 or greater 602 603 my %res = mce_loop { 604 my ($mce, $chunk_ref, $chunk_id) = @_; 605 my %ret; 606 for my $item (@{ $chunk_ref }) { 607 $ret{$item} = $item * 2; 608 } 609 MCE->gather(%ret); 610 } 611 \@list; 612 613 # Input hash; current API available since 1.828 614 615 my %res = mce_loop { 616 my ($mce, $chunk_ref, $chunk_id) = @_; 617 my %ret; 618 for my $key (keys %{ $chunk_ref }) { 619 $ret{$key} = $chunk_ref->{$key} * 2; 620 } 621 MCE->gather(%ret); 622 } 623 \%hash; 624 625=over 3 626 627=item MCE::Loop->run_file ( sub { code }, file ) 628 629=item mce_loop_f { code } file 630 631=back 632 633The fastest of these is the /path/to/file. Workers communicate the next offset 634position among themselves with zero interaction by the manager process. 635 636C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845. 637 638 # $_ contains the line when chunk_size => 1 639 640 mce_loop_f { $_ } "/path/to/file"; # faster 641 mce_loop_f { $_ } $file_handle; 642 mce_loop_f { $_ } $io; # IO::All 643 mce_loop_f { $_ } \$scalar; 644 645 # chunking, any chunk_size => 1 or greater 646 647 my %res = mce_loop_f { 648 my ($mce, $chunk_ref, $chunk_id) = @_; 649 my $buf = ''; 650 for my $line (@{ $chunk_ref }) { 651 $buf .= $line; 652 } 653 MCE->gather($chunk_id, $buf); 654 } 655 "/path/to/file"; 656 657=over 3 658 659=item MCE::Loop->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] ) 660 661=item mce_loop_s { code } $beg, $end [, $step, $fmt ] 662 663=back 664 665Sequence may be defined as a list, an array reference, or a hash reference. 666The functions require both begin and end values to run. Step and format are 667optional. The format is passed to sprintf (% may be omitted below). 668 669 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f"); 670 671 # $_ contains the sequence number when chunk_size => 1 672 673 mce_loop_s { $_ } $beg, $end, $step, $fmt; 674 mce_loop_s { $_ } [ $beg, $end, $step, $fmt ]; 675 676 mce_loop_s { $_ } { 677 begin => $beg, end => $end, 678 step => $step, format => $fmt 679 }; 680 681 # chunking, any chunk_size => 1 or greater 682 683 my %res = mce_loop_s { 684 my ($mce, $chunk_ref, $chunk_id) = @_; 685 my $buf = ''; 686 for my $seq (@{ $chunk_ref }) { 687 $buf .= "$seq\n"; 688 } 689 MCE->gather($chunk_id, $buf); 690 } 691 [ $beg, $end ]; 692 693The sequence engine can compute 'begin' and 'end' items only, for the chunk, 694and not the items in between (hence boundaries only). This option applies 695to sequence only and has no effect when chunk_size equals 1. 696 697The time to run is 0.006s below. This becomes 0.827s without the bounds_only 698option due to computing all items in between, thus creating a very large 699array. Basically, specify bounds_only => 1 when boundaries is all you need 700for looping inside the block; e.g. Monte Carlo simulations. 701 702Time was measured using 1 worker to emphasize the difference. 703 704 use MCE::Loop; 705 706 MCE::Loop->init( 707 max_workers => 1, chunk_size => 1_250_000, 708 bounds_only => 1 709 ); 710 711 # Typically, the input scalar $_ contains the sequence number 712 # when chunk_size => 1, unless the bounds_only option is set 713 # which is the case here. Thus, $_ points to $chunk_ref. 714 715 mce_loop_s { 716 my ($mce, $chunk_ref, $chunk_id) = @_; 717 718 # $chunk_ref contains 2 items, not 1_250_000 719 # my ( $begin, $end ) = ( $_->[0], $_->[1] ); 720 721 my $begin = $chunk_ref->[0]; 722 my $end = $chunk_ref->[1]; 723 724 # for my $seq ( $begin .. $end ) { 725 # ... 726 # } 727 728 MCE->printf("%7d .. %8d\n", $begin, $end); 729 } 730 [ 1, 10_000_000 ]; 731 732 -- Output 733 734 1 .. 1250000 735 1250001 .. 2500000 736 2500001 .. 3750000 737 3750001 .. 5000000 738 5000001 .. 6250000 739 6250001 .. 7500000 740 7500001 .. 8750000 741 8750001 .. 10000000 742 743=over 3 744 745=item MCE::Loop->run ( sub { code }, iterator ) 746 747=item mce_loop { code } iterator 748 749=back 750 751An iterator reference may be specified for input_data. Iterators are described 752under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. 753 754 mce_loop { $_ } make_iterator(10, 30, 2); 755 756=head1 GATHERING DATA 757 758Unlike MCE::Map where gather and output order are done for you automatically, 759the gather method is used to have results sent back to the manager process. 760 761 use MCE::Loop chunk_size => 1; 762 763 ## Output order is not guaranteed. 764 my @a1 = mce_loop { MCE->gather($_ * 2) } 1..100; 765 print "@a1\n\n"; 766 767 ## Outputs to a hash instead (key, value). 768 my %h1 = mce_loop { MCE->gather($_, $_ * 2) } 1..100; 769 print "@h1{1..100}\n\n"; 770 771 ## This does the same thing due to chunk_id starting at one. 772 my %h2 = mce_loop { MCE->gather(MCE->chunk_id, $_ * 2) } 1..100; 773 print "@h2{1..100}\n\n"; 774 775The gather method may be called multiple times within the block unlike return 776which would leave the block. Therefore, think of gather as yielding results 777immediately to the manager process without actually leaving the block. 778 779 use MCE::Loop chunk_size => 1, max_workers => 3; 780 781 my @hosts = qw( 782 hosta hostb hostc hostd hoste 783 ); 784 785 my %h3 = mce_loop { 786 my ($output, $error, $status); my $host = $_; 787 788 ## Do something with $host; 789 $output = "Worker ". MCE->wid .": Hello from $host"; 790 791 if (MCE->chunk_id % 3 == 0) { 792 ## Simulating an error condition 793 local $? = 1; $status = $?; 794 $error = "Error from $host" 795 } 796 else { 797 $status = 0; 798 } 799 800 ## Ensure unique keys (key, value) when gathering to 801 ## a hash. 802 MCE->gather("$host.out", $output); 803 MCE->gather("$host.err", $error) if (defined $error); 804 MCE->gather("$host.sta", $status); 805 806 } @hosts; 807 808 foreach my $host (@hosts) { 809 print $h3{"$host.out"}, "\n"; 810 print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"}); 811 print "Exit status: ", $h3{"$host.sta"}, "\n\n"; 812 } 813 814 -- Output 815 816 Worker 2: Hello from hosta 817 Exit status: 0 818 819 Worker 1: Hello from hostb 820 Exit status: 0 821 822 Worker 3: Hello from hostc 823 Error from hostc 824 Exit status: 1 825 826 Worker 2: Hello from hostd 827 Exit status: 0 828 829 Worker 1: Hello from hoste 830 Exit status: 0 831 832The following uses an anonymous array containing 3 elements when gathering 833data. Serialization is automatic behind the scene. 834 835 my %h3 = mce_loop { 836 ... 837 838 MCE->gather($host, [$output, $error, $status]); 839 840 } @hosts; 841 842 foreach my $host (@hosts) { 843 print $h3{$host}->[0], "\n"; 844 print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]); 845 print "Exit status: ", $h3{$host}->[2], "\n\n"; 846 } 847 848Although MCE::Map comes to mind, one may want additional control when 849gathering data such as retaining output order. 850 851 use MCE::Loop; 852 853 sub preserve_order { 854 my %tmp; my $order_id = 1; my $gather_ref = $_[0]; 855 856 return sub { 857 $tmp{ (shift) } = \@_; 858 859 while (1) { 860 last unless exists $tmp{$order_id}; 861 push @{ $gather_ref }, @{ delete $tmp{$order_id++} }; 862 } 863 864 return; 865 }; 866 } 867 868 my @m2; 869 870 MCE::Loop->init( 871 chunk_size => 'auto', max_workers => 'auto', 872 gather => preserve_order(\@m2) 873 ); 874 875 mce_loop { 876 my @a; my ($mce, $chunk_ref, $chunk_id) = @_; 877 878 ## Compute the entire chunk data at once. 879 push @a, map { $_ * 2 } @{ $chunk_ref }; 880 881 ## Afterwards, invoke the gather feature, which 882 ## will direct the data to the callback function. 883 MCE->gather(MCE->chunk_id, @a); 884 885 } 1..100000; 886 887 MCE::Loop->finish; 888 889 print scalar @m2, "\n"; 890 891All 6 models support 'auto' for chunk_size unlike the Core API. Think of the 892models as the basis for providing JIT for MCE. They create the instance, tune 893max_workers, and tune chunk_size automatically regardless of the hardware. 894 895The following does the same thing using the Core API. 896 897 use MCE; 898 899 sub preserve_order { 900 ... 901 } 902 903 my $mce = MCE->new( 904 max_workers => 'auto', chunk_size => 8000, 905 906 user_func => sub { 907 my @a; my ($mce, $chunk_ref, $chunk_id) = @_; 908 909 ## Compute the entire chunk data at once. 910 push @a, map { $_ * 2 } @{ $chunk_ref }; 911 912 ## Afterwards, invoke the gather feature, which 913 ## will direct the data to the callback function. 914 MCE->gather(MCE->chunk_id, @a); 915 } 916 ); 917 918 my @m2; 919 920 $mce->process({ gather => preserve_order(\@m2) }, [1..100000]); 921 $mce->shutdown; 922 923 print scalar @m2, "\n"; 924 925=head1 MANUAL SHUTDOWN 926 927=over 3 928 929=item MCE::Loop->finish 930 931=item MCE::Loop::finish 932 933=back 934 935Workers remain persistent as much as possible after running. Shutdown occurs 936automatically when the script terminates. Call finish when workers are no 937longer needed. 938 939 use MCE::Loop; 940 941 MCE::Loop->init( 942 chunk_size => 20, max_workers => 'auto' 943 ); 944 945 mce_loop { ... } 1..100; 946 947 MCE::Loop->finish; 948 949=head1 INDEX 950 951L<MCE|MCE>, L<MCE::Core> 952 953=head1 AUTHOR 954 955Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> 956 957=cut 958 959