1############################################################################### 2## ---------------------------------------------------------------------------- 3## Parallel flow model for building creative applications. 4## 5############################################################################### 6 7package MCE::Flow; 8 9use strict; 10use warnings; 11 12no warnings qw( threads recursion uninitialized ); 13 14our $VERSION = '1.876'; 15 16## no critic (BuiltinFunctions::ProhibitStringyEval) 17## no critic (Subroutines::ProhibitSubroutinePrototypes) 18## no critic (TestingAndDebugging::ProhibitNoStrict) 19 20use Scalar::Util qw( looks_like_number ); 21use MCE; 22 23our @CARP_NOT = qw( MCE ); 24 25my $_tid = $INC{'threads.pm'} ? threads->tid() : 0; 26 27sub CLONE { 28 $_tid = threads->tid() if $INC{'threads.pm'}; 29} 30 31############################################################################### 32## ---------------------------------------------------------------------------- 33## Import routine. 34## 35############################################################################### 36 37my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Flow'); 38my ($_prev_c, $_prev_n, $_prev_t, $_prev_w) = ({}, {}, {}, {}); 39my ($_user_tasks) = ({}); 40 41sub import { 42 my ($_class, $_pkg) = (shift, caller); 43 44 my $_p = $_def->{$_pkg} = { 45 MAX_WORKERS => 'auto', 46 CHUNK_SIZE => 'auto', 47 }; 48 49 ## Import functions. 50 no strict 'refs'; no warnings 'redefine'; 51 52 *{ $_pkg.'::mce_flow_f' } = \&run_file; 53 *{ $_pkg.'::mce_flow_s' } = \&run_seq; 54 *{ $_pkg.'::mce_flow' } = \&run; 55 56 ## Process module arguments. 57 while ( my $_argument = shift ) { 58 my $_arg = lc $_argument; 59 60 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' ); 61 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' ); 62 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' ); 63 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' ); 64 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' ); 65 66 ## Sereal 3.015+, if available, is used automatically by MCE 1.8+. 67 if ( $_arg eq 'sereal' ) { 68 if ( shift eq '0' ) { 69 require Storable; 70 $_p->{FREEZE} = \&Storable::freeze; 71 $_p->{THAW} = \&Storable::thaw; 72 } 73 next; 74 } 75 76 _croak("Error: ($_argument) invalid module option"); 77 } 78 79 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS}); 80 81 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag); 82 MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag) 83 unless ($_p->{CHUNK_SIZE} eq 'auto'); 84 85 return; 86} 87 88############################################################################### 89## ---------------------------------------------------------------------------- 90## Init and finish routines. 91## 92############################################################################### 93 94sub init (@) { 95 96 shift if (defined $_[0] && $_[0] eq 'MCE::Flow'); 97 my $_pkg = "$$.$_tid.".caller(); 98 99 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ }; 100 101 @_ = (); 102 103 return; 104} 105 106sub finish (@) { 107 108 shift if (defined $_[0] && $_[0] eq 'MCE::Flow'); 109 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller(); 110 111 if ( $_pkg eq 'MCE' ) { 112 for my $_k ( keys %{ $_MCE } ) { MCE::Flow->finish($_k, 1); } 113 } 114 elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) { 115 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned}; 116 117 delete $_user_tasks->{$_pkg}; 118 delete $_prev_c->{$_pkg}; 119 delete $_prev_n->{$_pkg}; 120 delete $_prev_t->{$_pkg}; 121 delete $_prev_w->{$_pkg}; 122 delete $_MCE->{$_pkg}; 123 } 124 125 @_ = (); 126 127 return; 128} 129 130############################################################################### 131## ---------------------------------------------------------------------------- 132## Parallel flow with MCE -- file. 133## 134############################################################################### 135 136sub run_file (@) { 137 138 shift if (defined $_[0] && $_[0] eq 'MCE::Flow'); 139 140 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1; 141 my $_pid = "$$.$_tid.".caller(); 142 143 if (defined (my $_p = $_params->{$_pid})) { 144 delete $_p->{input_data} if (exists $_p->{input_data}); 145 delete $_p->{sequence} if (exists $_p->{sequence}); 146 } 147 else { 148 $_params->{$_pid} = {}; 149 } 150 151 for my $_i ($_start_pos .. @_ - 1) { 152 my $_r = ref $_[$_i]; 153 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) { 154 $_file = $_[$_i]; $_pos = $_i; 155 last; 156 } 157 } 158 159 if (defined $_file && ref $_file eq '' && $_file ne '') { 160 _croak("$_tag: ($_file) does not exist") unless (-e $_file); 161 _croak("$_tag: ($_file) is not readable") unless (-r $_file); 162 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file); 163 $_params->{$_pid}{_file} = $_file; 164 } 165 elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) { 166 $_params->{$_pid}{_file} = $_file; 167 } 168 else { 169 _croak("$_tag: (file) is not specified or valid"); 170 } 171 172 if (defined $_pos) { 173 pop @_ for ($_pos .. @_ - 1); 174 } 175 176 return run(@_); 177} 178 179############################################################################### 180## ---------------------------------------------------------------------------- 181## Parallel flow with MCE -- sequence. 182## 183############################################################################### 184 185sub run_seq (@) { 186 187 shift if (defined $_[0] && $_[0] eq 'MCE::Flow'); 188 189 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1; 190 my $_pid = "$$.$_tid.".caller(); 191 192 if (defined (my $_p = $_params->{$_pid})) { 193 delete $_p->{sequence} if (exists $_p->{sequence}); 194 delete $_p->{input_data} if (exists $_p->{input_data}); 195 delete $_p->{_file} if (exists $_p->{_file}); 196 } 197 else { 198 $_params->{$_pid} = {}; 199 } 200 201 for my $_i ($_start_pos .. @_ - 1) { 202 my $_r = ref $_[$_i]; 203 204 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') { 205 $_pos = $_i; 206 207 if ($_r eq '' || $_r =~ /^Math::/) { 208 $_begin = $_[$_pos]; $_end = $_[$_pos + 1]; 209 $_params->{$_pid}{sequence} = [ 210 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3] 211 ]; 212 } 213 elsif ($_r eq 'HASH') { 214 $_begin = $_[$_pos]->{begin}; $_end = $_[$_pos]->{end}; 215 $_params->{$_pid}{sequence} = $_[$_pos]; 216 } 217 elsif ($_r eq 'ARRAY') { 218 $_begin = $_[$_pos]->[0]; $_end = $_[$_pos]->[1]; 219 $_params->{$_pid}{sequence} = $_[$_pos]; 220 } 221 222 last; 223 } 224 } 225 226 _croak("$_tag: (sequence) is not specified or valid") 227 unless (exists $_params->{$_pid}{sequence}); 228 _croak("$_tag: (begin) is not specified for sequence") 229 unless (defined $_begin); 230 _croak("$_tag: (end) is not specified for sequence") 231 unless (defined $_end); 232 233 $_params->{$_pid}{sequence_run} = undef; 234 235 if (defined $_pos) { 236 pop @_ for ($_pos .. @_ - 1); 237 } 238 239 return run(@_); 240} 241 242############################################################################### 243## ---------------------------------------------------------------------------- 244## Parallel flow with MCE. 245## 246############################################################################### 247 248sub run (@) { 249 250 shift if (defined $_[0] && $_[0] eq 'MCE::Flow'); 251 252 my $_pkg = caller() eq 'MCE::Flow' ? caller(1) : caller(); 253 my $_pid = "$$.$_tid.$_pkg"; 254 255 if (ref $_[0] eq 'HASH') { 256 $_params->{$_pid} = {} unless defined $_params->{$_pid}; 257 for my $_p (keys %{ $_[0] }) { 258 $_params->{$_pid}{$_p} = $_[0]->{$_p}; 259 } 260 261 shift; 262 } 263 264 ## ------------------------------------------------------------------------- 265 266 my (@_code, @_name, @_thrs, @_wrks); my $_init_mce = 0; my $_pos = 0; 267 268 while (ref $_[0] eq 'CODE') { 269 push @_code, $_[0]; 270 271 if (defined (my $_p = $_params->{$_pid})) { 272 push @_name, (ref $_p->{task_name} eq 'ARRAY') 273 ? $_p->{task_name}->[$_pos] : undef; 274 push @_thrs, (ref $_p->{use_threads} eq 'ARRAY') 275 ? $_p->{use_threads}->[$_pos] : undef; 276 push @_wrks, (ref $_p->{max_workers} eq 'ARRAY') 277 ? $_p->{max_workers}->[$_pos] : undef; 278 } 279 280 $_init_mce = 1 if ( 281 !defined $_prev_c->{$_pid}[$_pos] || 282 $_prev_c->{$_pid}[$_pos] != $_code[$_pos] 283 ); 284 285 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]); 286 $_init_mce = 1 if ($_prev_t->{$_pid}[$_pos] ne $_thrs[$_pos]); 287 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]); 288 289 $_prev_c->{$_pid}[$_pos] = $_code[$_pos]; 290 $_prev_n->{$_pid}[$_pos] = $_name[$_pos]; 291 $_prev_t->{$_pid}[$_pos] = $_thrs[$_pos]; 292 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos]; 293 294 shift; $_pos++; 295 } 296 297 if (defined $_prev_c->{$_pid}[$_pos]) { 298 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } }); 299 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } }); 300 pop @{ $_prev_t->{$_pid} } for ($_pos .. $#{ $_prev_t->{$_pid } }); 301 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } }); 302 303 $_init_mce = 1; 304 } 305 306 return unless (scalar @_code); 307 308 ## ------------------------------------------------------------------------- 309 310 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS}; 311 my $_r = ref $_[0]; 312 313 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::)/) { 314 $_input_data = shift; 315 } 316 317 if (defined (my $_p = $_params->{$_pid})) { 318 $_max_workers = MCE::_parse_max_workers($_p->{max_workers}) 319 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY'); 320 321 delete $_p->{sequence} if (defined $_input_data || scalar @_); 322 delete $_p->{user_func} if (exists $_p->{user_func}); 323 delete $_p->{user_tasks} if (exists $_p->{user_tasks}); 324 } 325 326 if (@_code > 1 && $_max_workers > 1) { 327 $_max_workers = int($_max_workers / @_code + 0.5) + 1; 328 } 329 330 my $_chunk_size = MCE::_parse_chunk_size( 331 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid}, 332 $_input_data, scalar @_ 333 ); 334 335 if (defined (my $_p = $_params->{$_pid})) { 336 if (exists $_p->{_file}) { 337 $_input_data = delete $_p->{_file}; 338 } else { 339 $_input_data = $_p->{input_data} if exists $_p->{input_data}; 340 } 341 } 342 343 ## ------------------------------------------------------------------------- 344 345 MCE::_save_state($_MCE->{$_pid}); 346 347 if ($_init_mce) { 348 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid}); 349 350 ## must clear arrays for nested session to work with Perl < v5.14 351 _gen_user_tasks($_pid, [@_code], [@_name], [@_thrs], [@_wrks]); 352 353 @_code = @_name = @_thrs = @_wrks = (); 354 355 my %_opts = ( 356 max_workers => $_max_workers, task_name => $_tag, 357 user_tasks => $_user_tasks->{$_pid}, 358 ); 359 360 if (defined (my $_p = $_params->{$_pid})) { 361 local $_; 362 363 for (keys %{ $_p }) { 364 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY'); 365 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY'); 366 next if ($_ eq 'use_threads' && ref $_p->{use_threads} eq 'ARRAY'); 367 368 next if ($_ eq 'chunk_size'); 369 next if ($_ eq 'input_data'); 370 next if ($_ eq 'sequence_run'); 371 372 _croak("$_tag: ($_) is not a valid constructor argument") 373 unless (exists $MCE::_valid_fields_new{$_}); 374 375 $_opts{$_} = $_p->{$_}; 376 } 377 } 378 379 for my $_k (qw/ tmp_dir freeze thaw /) { 380 $_opts{$_k} = $_def->{$_pkg}{uc($_k)} 381 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k}); 382 } 383 384 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts); 385 } 386 else { 387 ## Workers may persist after running. Thus, updating the MCE instance. 388 ## These options do not require respawning. 389 if (defined (my $_p = $_params->{$_pid})) { 390 for my $_k (qw( 391 RS interval stderr_file stdout_file user_error user_output 392 job_delay submit_delay on_post_exit on_post_run user_args 393 flush_file flush_stderr flush_stdout gather max_retries 394 )) { 395 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k}); 396 } 397 } 398 } 399 400 ## ------------------------------------------------------------------------- 401 402 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa); 403 404 if (defined $_input_data) { 405 @_ = (); 406 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data); 407 delete $_MCE->{$_pid}{input_data}; 408 } 409 elsif (scalar @_) { 410 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_); 411 delete $_MCE->{$_pid}{input_data}; 412 } 413 else { 414 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) { 415 $_MCE->{$_pid}->run({ 416 chunk_size => $_chunk_size, 417 sequence => $_params->{$_pid}{sequence} 418 }, 0); 419 if (exists $_params->{$_pid}{sequence_run}) { 420 delete $_params->{$_pid}{sequence_run}; 421 delete $_params->{$_pid}{sequence}; 422 } 423 delete $_MCE->{$_pid}{sequence}; 424 } 425 else { 426 $_MCE->{$_pid}->run({ chunk_size => $_chunk_size }, 0); 427 } 428 } 429 430 MCE::_restore_state(); 431 432 delete $_MCE->{$_pid}{gather} if (defined $_wa); 433 434 return ((defined $_wa) ? @_a : ()); 435} 436 437############################################################################### 438## ---------------------------------------------------------------------------- 439## Private methods. 440## 441############################################################################### 442 443sub _croak { 444 445 goto &MCE::_croak; 446} 447 448sub _gen_user_tasks { 449 450 my ($_pid, $_code_ref, $_name_ref, $_thrs_ref, $_wrks_ref) = @_; 451 452 @{ $_user_tasks->{$_pid} } = (); 453 454 for (my $_i = 0; $_i < @{ $_code_ref }; $_i++) { 455 push @{ $_user_tasks->{$_pid} }, { 456 task_name => $_name_ref->[$_i], 457 use_threads => $_thrs_ref->[$_i], 458 max_workers => $_wrks_ref->[$_i], 459 user_func => $_code_ref->[$_i] 460 } 461 } 462 463 return; 464} 465 4661; 467 468__END__ 469 470############################################################################### 471## ---------------------------------------------------------------------------- 472## Module usage. 473## 474############################################################################### 475 476=head1 NAME 477 478MCE::Flow - Parallel flow model for building creative applications 479 480=head1 VERSION 481 482This document describes MCE::Flow version 1.876 483 484=head1 DESCRIPTION 485 486MCE::Flow is great for writing custom apps to maximize on all available cores. 487This module was created to help one harness user_tasks within MCE. 488 489It is trivial to parallelize with mce_stream shown below. 490 491 ## Native map function 492 my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000; 493 494 ## Same as with MCE::Stream (processing from right to left) 495 @a = mce_stream 496 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000; 497 498 ## Pass an array reference to have writes occur simultaneously 499 mce_stream \@a, 500 sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000; 501 502However, let's have MCE::Flow compute the same in parallel. MCE::Queue 503will be used for data flow among the sub-tasks. 504 505 use MCE::Flow; 506 use MCE::Queue; 507 508This calls for preserving output order. 509 510 sub preserve_order { 511 my %tmp; my $order_id = 1; my $gather_ref = $_[0]; 512 @{ $gather_ref } = (); ## clear the array (optional) 513 514 return sub { 515 my ($data_ref, $chunk_id) = @_; 516 $tmp{$chunk_id} = $data_ref; 517 518 while (1) { 519 last unless exists $tmp{$order_id}; 520 push @{ $gather_ref }, @{ delete $tmp{$order_id++} }; 521 } 522 523 return; 524 }; 525 } 526 527Two queues are needed for data flow between the 3 sub-tasks. Notice task_end 528and how the value from $task_name is used for determining which task has ended. 529 530 my $b = MCE::Queue->new; 531 my $c = MCE::Queue->new; 532 533 sub task_end { 534 my ($mce, $task_id, $task_name) = @_; 535 536 if (defined $mce->{user_tasks}->[$task_id + 1]) { 537 my $n_workers = $mce->{user_tasks}->[$task_id + 1]->{max_workers}; 538 539 if ($task_name eq 'a') { 540 $b->enqueue((undef) x $n_workers); 541 } 542 elsif ($task_name eq 'b') { 543 $c->enqueue((undef) x $n_workers); 544 } 545 } 546 547 return; 548 } 549 550Next are the 3 sub-tasks. The first one reads input and begins the flow. 551The 2nd task dequeues, performs the calculation, and enqueues into the next. 552Finally, the last task calls the gather method. 553 554Although serialization is done for you automatically, it is done here to save 555from double serialization. This is the fastest approach for passing data 556between sub-tasks. Thus, the least overhead. 557 558 sub task_a { 559 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_; 560 561 push @ans, map { $_ * 2 } @{ $chunk_ref }; 562 $b->enqueue(MCE->freeze([ \@ans, $chunk_id ])); 563 564 return; 565 } 566 567 sub task_b { 568 my ($mce) = @_; 569 570 while (1) { 571 my @ans; my $chunk = $b->dequeue; 572 last unless defined $chunk; 573 574 $chunk = MCE->thaw($chunk); 575 push @ans, map { $_ * 3 } @{ $chunk->[0] }; 576 $c->enqueue(MCE->freeze([ \@ans, $chunk->[1] ])); 577 } 578 579 return; 580 } 581 582 sub task_c { 583 my ($mce) = @_; 584 585 while (1) { 586 my @ans; my $chunk = $c->dequeue; 587 last unless defined $chunk; 588 589 $chunk = MCE->thaw($chunk); 590 push @ans, map { $_ * 4 } @{ $chunk->[0] }; 591 MCE->gather(\@ans, $chunk->[1]); 592 } 593 594 return; 595 } 596 597In summary, MCE::Flow builds out a MCE instance behind the scene and starts 598running. The task_name (shown), max_workers, and use_threads options can take 599an anonymous array for specifying the values uniquely per each sub-task. 600 601 my @a; 602 603 mce_flow { 604 task_name => [ 'a', 'b', 'c' ], task_end => \&task_end, 605 gather => preserve_order(\@a) 606 607 }, \&task_a, \&task_b, \&task_c, 1..10000; 608 609 print "@a\n"; 610 611If speed is not a concern and wanting to rid of all the MCE->freeze and 612MCE->thaw statements, simply enqueue and dequeue 2 items at a time. 613Or better yet, see L<MCE::Step> introduced in MCE 1.506. 614 615First, task_end must be updated. The number of undef(s) must match the number 616of workers times the dequeue count. Otherwise, the script will stall. 617 618 sub task_end { 619 ... 620 if ($task_name eq 'a') { 621 # $b->enqueue((undef) x $n_workers); 622 $b->enqueue((undef) x ($n_workers * 2)); 623 } 624 elsif ($task_name eq 'b') { 625 # $c->enqueue((undef) x $n_workers); 626 $c->enqueue((undef) x ($n_workers * 2)); 627 } 628 ... 629 } 630 631Next, the 3 sub-tasks enqueuing and dequeuing 2 elements at a time. 632 633 sub task_a { 634 my @ans; my ($mce, $chunk_ref, $chunk_id) = @_; 635 636 push @ans, map { $_ * 2 } @{ $chunk_ref }; 637 $b->enqueue(\@ans, $chunk_id); 638 639 return; 640 } 641 642 sub task_b { 643 my ($mce) = @_; 644 645 while (1) { 646 my @ans; my ($chunk_ref, $chunk_id) = $b->dequeue(2); 647 last unless defined $chunk_ref; 648 649 push @ans, map { $_ * 3 } @{ $chunk_ref }; 650 $c->enqueue(\@ans, $chunk_id); 651 } 652 653 return; 654 } 655 656 sub task_c { 657 my ($mce) = @_; 658 659 while (1) { 660 my @ans; my ($chunk_ref, $chunk_id) = $c->dequeue(2); 661 last unless defined $chunk_ref; 662 663 push @ans, map { $_ * 4 } @{ $chunk_ref }; 664 MCE->gather(\@ans, $chunk_id); 665 } 666 667 return; 668 } 669 670Finally, run as usual. 671 672 my @a; 673 674 mce_flow { 675 task_name => [ 'a', 'b', 'c' ], task_end => \&task_end, 676 gather => preserve_order(\@a) 677 678 }, \&task_a, \&task_b, \&task_c, 1..10000; 679 680 print "@a\n"; 681 682=head1 SYNOPSIS when CHUNK_SIZE EQUALS 1 683 684Although L<MCE::Loop> may be preferred for running using a single code block, 685the text below also applies to this module, particularly for the first block. 686 687All models in MCE default to 'auto' for chunk_size. The arguments for the block 688are the same as writing a user_func block using the Core API. 689 690Beginning with MCE 1.5, the next input item is placed into the input scalar 691variable $_ when chunk_size equals 1. Otherwise, $_ points to $chunk_ref 692containing many items. Basically, line 2 below may be omitted from your code 693when using $_. One can call MCE->chunk_id to obtain the current chunk id. 694 695 line 1: user_func => sub { 696 line 2: my ($mce, $chunk_ref, $chunk_id) = @_; 697 line 3: 698 line 4: $_ points to $chunk_ref->[0] 699 line 5: in MCE 1.5 when chunk_size == 1 700 line 6: 701 line 7: $_ points to $chunk_ref 702 line 8: in MCE 1.5 when chunk_size > 1 703 line 9: } 704 705Follow this synopsis when chunk_size equals one. Looping is not required from 706inside the first block. Hence, the block is called once per each item. 707 708 ## Exports mce_flow, mce_flow_f, and mce_flow_s 709 use MCE::Flow; 710 711 MCE::Flow->init( 712 chunk_size => 1 713 ); 714 715 ## Array or array_ref 716 mce_flow sub { do_work($_) }, 1..10000; 717 mce_flow sub { do_work($_) }, \@list; 718 719 ## Important; pass an array_ref for deeply input data 720 mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ]; 721 mce_flow sub { do_work($_) }, \@deeply_list; 722 723 ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref 724 ## Workers read directly and not involve the manager process 725 mce_flow_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient 726 727 ## Involves the manager process, therefore slower 728 mce_flow_f sub { chomp; do_work($_) }, $file_handle; 729 mce_flow_f sub { chomp; do_work($_) }, $io; 730 mce_flow_f sub { chomp; do_work($_) }, \$scalar; 731 732 ## Sequence of numbers (begin, end [, step, format]) 733 mce_flow_s sub { do_work($_) }, 1, 10000, 5; 734 mce_flow_s sub { do_work($_) }, [ 1, 10000, 5 ]; 735 736 mce_flow_s sub { do_work($_) }, { 737 begin => 1, end => 10000, step => 5, format => undef 738 }; 739 740=head1 SYNOPSIS when CHUNK_SIZE is GREATER THAN 1 741 742Follow this synopsis when chunk_size equals 'auto' or greater than 1. 743This means having to loop through the chunk from inside the first block. 744 745 use MCE::Flow; 746 747 MCE::Flow->init( ## Chunk_size defaults to 'auto' when 748 chunk_size => 'auto' ## not specified. Therefore, the init 749 ); ## function may be omitted. 750 751 ## Syntax is shown for mce_flow for demonstration purposes. 752 ## Looping inside the block is the same for mce_flow_f and 753 ## mce_flow_s. 754 755 ## Array or array_ref 756 mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000; 757 mce_flow sub { do_work($_) for (@{ $_ }) }, \@list; 758 759 ## Important; pass an array_ref for deeply input data 760 mce_flow sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ]; 761 mce_flow sub { do_work($_) for (@{ $_ }) }, \@deeply_list; 762 763 ## Resembles code using the core MCE API 764 mce_flow sub { 765 my ($mce, $chunk_ref, $chunk_id) = @_; 766 767 for (@{ $chunk_ref }) { 768 do_work($_); 769 } 770 771 }, 1..10000; 772 773Chunking reduces the number of IPC calls behind the scene. Think in terms of 774chunks whenever processing a large amount of data. For relatively small data, 775choosing 1 for chunk_size is fine. 776 777=head1 OVERRIDING DEFAULTS 778 779The following list options which may be overridden when loading the module. 780 781 use Sereal qw( encode_sereal decode_sereal ); 782 use CBOR::XS qw( encode_cbor decode_cbor ); 783 use JSON::XS qw( encode_json decode_json ); 784 785 use MCE::Flow 786 max_workers => 8, # Default 'auto' 787 chunk_size => 500, # Default 'auto' 788 tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir 789 freeze => \&encode_sereal, # \&Storable::freeze 790 thaw => \&decode_sereal # \&Storable::thaw 791 ; 792 793From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available. 794Specify C<< Sereal => 0 >> to use Storable instead. 795 796 use MCE::Flow Sereal => 0; 797 798=head1 CUSTOMIZING MCE 799 800=over 3 801 802=item MCE::Flow->init ( options ) 803 804=item MCE::Flow::init { options } 805 806=back 807 808The init function accepts a hash of MCE options. Unlike with MCE::Stream, 809both gather and bounds_only options may be specified when calling init 810(not shown below). 811 812 use MCE::Flow; 813 814 MCE::Flow->init( 815 chunk_size => 1, max_workers => 4, 816 817 user_begin => sub { 818 print "## ", MCE->wid, " started\n"; 819 }, 820 821 user_end => sub { 822 print "## ", MCE->wid, " completed\n"; 823 } 824 ); 825 826 my %a = mce_flow sub { MCE->gather($_, $_ * $_) }, 1..100; 827 828 print "\n", "@a{1..100}", "\n"; 829 830 -- Output 831 832 ## 3 started 833 ## 2 started 834 ## 4 started 835 ## 1 started 836 ## 2 completed 837 ## 4 completed 838 ## 3 completed 839 ## 1 completed 840 841 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 842 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156 843 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209 844 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600 845 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329 846 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396 847 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801 848 10000 849 850Like with MCE::Flow->init above, MCE options may be specified using an 851anonymous hash for the first argument. Notice how task_name, max_workers, 852and use_threads can take an anonymous array for setting uniquely per 853each code block. 854 855Unlike MCE::Stream which processes from right-to-left, MCE::Flow begins 856with the first code block, thus processing from left-to-right. 857 858 use threads; 859 use MCE::Flow; 860 861 my @a = mce_flow { 862 task_name => [ 'a', 'b', 'c' ], 863 max_workers => [ 3, 4, 2, ], 864 use_threads => [ 1, 0, 0, ], 865 866 user_end => sub { 867 my ($mce, $task_id, $task_name) = @_; 868 MCE->print("$task_id - $task_name completed\n"); 869 }, 870 871 task_end => sub { 872 my ($mce, $task_id, $task_name) = @_; 873 MCE->print("$task_id - $task_name ended\n"); 874 } 875 }, 876 sub { sleep 1; }, ## 3 workers, named a 877 sub { sleep 2; }, ## 4 workers, named b 878 sub { sleep 3; }; ## 2 workers, named c 879 880 -- Output 881 882 0 - a completed 883 0 - a completed 884 0 - a completed 885 0 - a ended 886 1 - b completed 887 1 - b completed 888 1 - b completed 889 1 - b completed 890 1 - b ended 891 2 - c completed 892 2 - c completed 893 2 - c ended 894 895=head1 API DOCUMENTATION 896 897Although input data is optional for MCE::Flow, the following assumes chunk_size 898equals 1 in order to demonstrate all the possibilities for providing input data. 899 900=over 3 901 902=item MCE::Flow->run ( sub { code }, list ) 903 904=item mce_flow sub { code }, list 905 906=back 907 908Input data may be defined using a list, an array ref, or a hash ref. 909 910Unlike MCE::Loop, Map, and Grep which take a block as C<{ ... }>, Flow takes a 911C<sub { ... }> or a code reference. The other difference is that the comma is 912needed after the block. 913 914 # $_ contains the item when chunk_size => 1 915 916 mce_flow sub { do_work($_) }, 1..1000; 917 mce_flow sub { do_work($_) }, \@list; 918 919 # Important; pass an array_ref for deeply input data 920 921 mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ]; 922 mce_flow sub { do_work($_) }, \@deeply_list; 923 924 # Chunking; any chunk_size => 1 or greater 925 926 my %res = mce_flow sub { 927 my ($mce, $chunk_ref, $chunk_id) = @_; 928 my %ret; 929 for my $item (@{ $chunk_ref }) { 930 $ret{$item} = $item * 2; 931 } 932 MCE->gather(%ret); 933 }, 934 \@list; 935 936 # Input hash; current API available since 1.828 937 938 my %res = mce_flow sub { 939 my ($mce, $chunk_ref, $chunk_id) = @_; 940 my %ret; 941 for my $key (keys %{ $chunk_ref }) { 942 $ret{$key} = $chunk_ref->{$key} * 2; 943 } 944 MCE->gather(%ret); 945 }, 946 \%hash; 947 948 # Unlike MCE::Loop, MCE::Flow doesn't need input to run 949 950 mce_flow { max_workers => 4 }, sub { 951 MCE->say( MCE->wid ); 952 }; 953 954 # ... and can run multiple tasks 955 956 mce_flow { 957 max_workers => [ 1, 3 ], 958 task_name => [ 'p', 'c' ] 959 }, 960 sub { 961 # 1 producer 962 MCE->say( "producer: ", MCE->wid ); 963 }, 964 sub { 965 # 3 consumers 966 MCE->say( "consumer: ", MCE->wid ); 967 }; 968 969 # Here, options are specified via init 970 971 MCE::Flow->init( 972 max_workers => [ 1, 3 ], 973 task_name => [ 'p', 'c' ] 974 ); 975 976 mce_flow \&producer, \&consumers; 977 978=over 3 979 980=item MCE::Flow->run_file ( sub { code }, file ) 981 982=item mce_flow_f sub { code }, file 983 984=back 985 986The fastest of these is the /path/to/file. Workers communicate the next offset 987position among themselves with zero interaction by the manager process. 988 989C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845. 990 991 # $_ contains the line when chunk_size => 1 992 993 mce_flow_f sub { $_ }, "/path/to/file"; # faster 994 mce_flow_f sub { $_ }, $file_handle; 995 mce_flow_f sub { $_ }, $io; # IO::All 996 mce_flow_f sub { $_ }, \$scalar; 997 998 # chunking, any chunk_size => 1 or greater 999 1000 my %res = mce_flow_f sub { 1001 my ($mce, $chunk_ref, $chunk_id) = @_; 1002 my $buf = ''; 1003 for my $line (@{ $chunk_ref }) { 1004 $buf .= $line; 1005 } 1006 MCE->gather($chunk_id, $buf); 1007 }, 1008 "/path/to/file"; 1009 1010=over 3 1011 1012=item MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] ) 1013 1014=item mce_flow_s sub { code }, $beg, $end [, $step, $fmt ] 1015 1016=back 1017 1018Sequence may be defined as a list, an array reference, or a hash reference. 1019The functions require both begin and end values to run. Step and format are 1020optional. The format is passed to sprintf (% may be omitted below). 1021 1022 my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f"); 1023 1024 # $_ contains the sequence number when chunk_size => 1 1025 1026 mce_flow_s sub { $_ }, $beg, $end, $step, $fmt; 1027 mce_flow_s sub { $_ }, [ $beg, $end, $step, $fmt ]; 1028 1029 mce_flow_s sub { $_ }, { 1030 begin => $beg, end => $end, 1031 step => $step, format => $fmt 1032 }; 1033 1034 # chunking, any chunk_size => 1 or greater 1035 1036 my %res = mce_flow_s sub { 1037 my ($mce, $chunk_ref, $chunk_id) = @_; 1038 my $buf = ''; 1039 for my $seq (@{ $chunk_ref }) { 1040 $buf .= "$seq\n"; 1041 } 1042 MCE->gather($chunk_id, $buf); 1043 }, 1044 [ $beg, $end ]; 1045 1046The sequence engine can compute 'begin' and 'end' items only, for the chunk, 1047and not the items in between (hence boundaries only). This option applies 1048to sequence only and has no effect when chunk_size equals 1. 1049 1050The time to run is 0.006s below. This becomes 0.827s without the bounds_only 1051option due to computing all items in between, thus creating a very large 1052array. Basically, specify bounds_only => 1 when boundaries is all you need 1053for looping inside the block; e.g. Monte Carlo simulations. 1054 1055Time was measured using 1 worker to emphasize the difference. 1056 1057 use MCE::Flow; 1058 1059 MCE::Flow->init( 1060 max_workers => 1, chunk_size => 1_250_000, 1061 bounds_only => 1 1062 ); 1063 1064 # Typically, the input scalar $_ contains the sequence number 1065 # when chunk_size => 1, unless the bounds_only option is set 1066 # which is the case here. Thus, $_ points to $chunk_ref. 1067 1068 mce_flow_s sub { 1069 my ($mce, $chunk_ref, $chunk_id) = @_; 1070 1071 # $chunk_ref contains 2 items, not 1_250_000 1072 # my ( $begin, $end ) = ( $_->[0], $_->[1] ); 1073 1074 my $begin = $chunk_ref->[0]; 1075 my $end = $chunk_ref->[1]; 1076 1077 # for my $seq ( $begin .. $end ) { 1078 # ... 1079 # } 1080 1081 MCE->printf("%7d .. %8d\n", $begin, $end); 1082 }, 1083 [ 1, 10_000_000 ]; 1084 1085 -- Output 1086 1087 1 .. 1250000 1088 1250001 .. 2500000 1089 2500001 .. 3750000 1090 3750001 .. 5000000 1091 5000001 .. 6250000 1092 6250001 .. 7500000 1093 7500001 .. 8750000 1094 8750001 .. 10000000 1095 1096=over 3 1097 1098=item MCE::Flow->run ( { input_data => iterator }, sub { code } ) 1099 1100=item mce_flow { input_data => iterator }, sub { code } 1101 1102=back 1103 1104An iterator reference may be specified for input_data. The only other way 1105is to specify input_data via MCE::Flow->init. This prevents MCE::Flow from 1106configuring the iterator reference as another user task which will not work. 1107 1108Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>. 1109 1110 MCE::Flow->init( 1111 input_data => iterator 1112 ); 1113 1114 mce_flow sub { $_ }; 1115 1116=head1 GATHERING DATA 1117 1118Unlike MCE::Map where gather and output order are done for you automatically, 1119the gather method is used to have results sent back to the manager process. 1120 1121 use MCE::Flow chunk_size => 1; 1122 1123 ## Output order is not guaranteed. 1124 my @a1 = mce_flow sub { MCE->gather($_ * 2) }, 1..100; 1125 print "@a1\n\n"; 1126 1127 ## Outputs to a hash instead (key, value). 1128 my %h1 = mce_flow sub { MCE->gather($_, $_ * 2) }, 1..100; 1129 print "@h1{1..100}\n\n"; 1130 1131 ## This does the same thing due to chunk_id starting at one. 1132 my %h2 = mce_flow sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100; 1133 print "@h2{1..100}\n\n"; 1134 1135The gather method may be called multiple times within the block unlike return 1136which would leave the block. Therefore, think of gather as yielding results 1137immediately to the manager process without actually leaving the block. 1138 1139 use MCE::Flow chunk_size => 1, max_workers => 3; 1140 1141 my @hosts = qw( 1142 hosta hostb hostc hostd hoste 1143 ); 1144 1145 my %h3 = mce_flow sub { 1146 my ($output, $error, $status); my $host = $_; 1147 1148 ## Do something with $host; 1149 $output = "Worker ". MCE->wid .": Hello from $host"; 1150 1151 if (MCE->chunk_id % 3 == 0) { 1152 ## Simulating an error condition 1153 local $? = 1; $status = $?; 1154 $error = "Error from $host" 1155 } 1156 else { 1157 $status = 0; 1158 } 1159 1160 ## Ensure unique keys (key, value) when gathering to 1161 ## a hash. 1162 MCE->gather("$host.out", $output); 1163 MCE->gather("$host.err", $error) if (defined $error); 1164 MCE->gather("$host.sta", $status); 1165 1166 }, @hosts; 1167 1168 foreach my $host (@hosts) { 1169 print $h3{"$host.out"}, "\n"; 1170 print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"}); 1171 print "Exit status: ", $h3{"$host.sta"}, "\n\n"; 1172 } 1173 1174 -- Output 1175 1176 Worker 3: Hello from hosta 1177 Exit status: 0 1178 1179 Worker 2: Hello from hostb 1180 Exit status: 0 1181 1182 Worker 1: Hello from hostc 1183 Error from hostc 1184 Exit status: 1 1185 1186 Worker 3: Hello from hostd 1187 Exit status: 0 1188 1189 Worker 2: Hello from hoste 1190 Exit status: 0 1191 1192The following uses an anonymous array containing 3 elements when gathering 1193data. Serialization is automatic behind the scene. 1194 1195 my %h3 = mce_flow sub { 1196 ... 1197 1198 MCE->gather($host, [$output, $error, $status]); 1199 1200 }, @hosts; 1201 1202 foreach my $host (@hosts) { 1203 print $h3{$host}->[0], "\n"; 1204 print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]); 1205 print "Exit status: ", $h3{$host}->[2], "\n\n"; 1206 } 1207 1208Although MCE::Map comes to mind, one may want additional control when 1209gathering data such as retaining output order. 1210 1211 use MCE::Flow; 1212 1213 sub preserve_order { 1214 my %tmp; my $order_id = 1; my $gather_ref = $_[0]; 1215 1216 return sub { 1217 $tmp{ (shift) } = \@_; 1218 1219 while (1) { 1220 last unless exists $tmp{$order_id}; 1221 push @{ $gather_ref }, @{ delete $tmp{$order_id++} }; 1222 } 1223 1224 return; 1225 }; 1226 } 1227 1228 ## Workers persist for the most part after running. Though, not always 1229 ## the case and depends on Perl. Pass a reference to a subroutine if 1230 ## workers must persist; e.g. mce_flow { ... }, \&foo, 1..100000. 1231 1232 MCE::Flow->init( 1233 chunk_size => 'auto', max_workers => 'auto' 1234 ); 1235 1236 for (1..2) { 1237 my @m2; 1238 1239 mce_flow { 1240 gather => preserve_order(\@m2) 1241 }, 1242 sub { 1243 my @a; my ($mce, $chunk_ref, $chunk_id) = @_; 1244 1245 ## Compute the entire chunk data at once. 1246 push @a, map { $_ * 2 } @{ $chunk_ref }; 1247 1248 ## Afterwards, invoke the gather feature, which 1249 ## will direct the data to the callback function. 1250 MCE->gather(MCE->chunk_id, @a); 1251 1252 }, 1..100000; 1253 1254 print scalar @m2, "\n"; 1255 } 1256 1257 MCE::Flow->finish; 1258 1259All 6 models support 'auto' for chunk_size unlike the Core API. Think of the 1260models as the basis for providing JIT for MCE. They create the instance, tune 1261max_workers, and tune chunk_size automatically regardless of the hardware. 1262 1263The following does the same thing using the Core API. Workers persist after 1264running. 1265 1266 use MCE; 1267 1268 sub preserve_order { 1269 ... 1270 } 1271 1272 my $mce = MCE->new( 1273 max_workers => 'auto', chunk_size => 8000, 1274 1275 user_func => sub { 1276 my @a; my ($mce, $chunk_ref, $chunk_id) = @_; 1277 1278 ## Compute the entire chunk data at once. 1279 push @a, map { $_ * 2 } @{ $chunk_ref }; 1280 1281 ## Afterwards, invoke the gather feature, which 1282 ## will direct the data to the callback function. 1283 MCE->gather(MCE->chunk_id, @a); 1284 } 1285 ); 1286 1287 for (1..2) { 1288 my @m2; 1289 1290 $mce->process({ gather => preserve_order(\@m2) }, [1..100000]); 1291 1292 print scalar @m2, "\n"; 1293 } 1294 1295 $mce->shutdown; 1296 1297=head1 MANUAL SHUTDOWN 1298 1299=over 3 1300 1301=item MCE::Flow->finish 1302 1303=item MCE::Flow::finish 1304 1305=back 1306 1307Workers remain persistent as much as possible after running. Shutdown occurs 1308automatically when the script terminates. Call finish when workers are no 1309longer needed. 1310 1311 use MCE::Flow; 1312 1313 MCE::Flow->init( 1314 chunk_size => 20, max_workers => 'auto' 1315 ); 1316 1317 mce_flow sub { ... }, 1..100; 1318 1319 MCE::Flow->finish; 1320 1321=head1 INDEX 1322 1323L<MCE|MCE>, L<MCE::Core> 1324 1325=head1 AUTHOR 1326 1327Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> 1328 1329=cut 1330 1331