1############################################################################### 2## ---------------------------------------------------------------------------- 3## MCE - Many-Core Engine for Perl providing parallel processing capabilities. 4## 5############################################################################### 6 7package MCE; 8 9use strict; 10use warnings; 11 12no warnings qw( threads recursion uninitialized ); 13 14our $VERSION = '1.876'; 15 16## no critic (BuiltinFunctions::ProhibitStringyEval) 17## no critic (Subroutines::ProhibitSubroutinePrototypes) 18## no critic (TestingAndDebugging::ProhibitNoStrict) 19 20use Carp (); 21 22my ($_has_threads, $_freeze, $_thaw, $_tid, $_oid); 23 24BEGIN { 25 local $@; 26 27 if ( $^O eq 'MSWin32' && ! $INC{'threads.pm'} ) { 28 eval 'use threads; use threads::shared;'; 29 } 30 elsif ( $INC{'threads.pm'} && ! $INC{'threads/shared.pm'} ) { 31 eval 'use threads::shared;'; 32 } 33 34 $_has_threads = $INC{'threads.pm'} ? 1 : 0; 35 $_tid = $_has_threads ? threads->tid() : 0; 36 $_oid = "$$.$_tid"; 37 38 if ( $] ge '5.008008' && ! $INC{'PDL.pm'} ) { 39 eval 'use Sereal::Encoder 3.015; use Sereal::Decoder 3.015;'; 40 if ( ! $@ ) { 41 my $_encoder_ver = int( Sereal::Encoder->VERSION() ); 42 my $_decoder_ver = int( Sereal::Decoder->VERSION() ); 43 if ( $_encoder_ver - $_decoder_ver == 0 ) { 44 $_freeze = \&Sereal::Encoder::encode_sereal; 45 $_thaw = \&Sereal::Decoder::decode_sereal; 46 } 47 } 48 } 49 50 if ( ! defined $_freeze ) { 51 require Storable; 52 $_freeze = \&Storable::freeze; 53 $_thaw = \&Storable::thaw; 54 } 55} 56 57use IO::Handle (); 58use Scalar::Util qw( looks_like_number refaddr reftype weaken ); 59use Socket qw( SOL_SOCKET SO_RCVBUF ); 60use Time::HiRes qw( sleep time ); 61 62use MCE::Util qw( $LF ); 63use MCE::Signal (); 64use MCE::Mutex (); 65 66our ($MCE, $RLA, $_que_template, $_que_read_size); 67our (%_valid_fields_new); 68 69my ($TOP_HDLR, $_is_MSWin32, $_is_winenv, $_prev_mce); 70my (%_valid_fields_task, %_params_allowed_args); 71 72BEGIN { 73 ## Configure pack/unpack template for writing to and from the queue. 74 ## Each entry contains 2 positive numbers: chunk_id & msg_id. 75 ## Check for >= 64-bit, otherwize fall back to machine's word length. 76 77 $_que_template = ( ( log(~0+1) / log(2) ) >= 64 ) ? 'Q2' : 'I2'; 78 $_que_read_size = length pack($_que_template, 0, 0); 79 80 ## Attributes used internally. 81 ## _abort_msg _caller _chn _com_lock _dat_lock _mgr_live _rla_data _seed 82 ## _chunk_id _pids _run_mode _single_dim _thrs _tids _task_wid _wid _wuf 83 ## _exiting _exit_pid _last_sref _total_exited _total_running _total_workers 84 ## _send_cnt _sess_dir _spawned _state _status _task _task_id _wrk_status 85 ## _init_pid _init_total_workers _pids_t _pids_w _pids_c _relayed 86 ## 87 ## _bsb_r_sock _bsb_w_sock _com_r_sock _com_w_sock _dat_r_sock _dat_w_sock 88 ## _que_r_sock _que_w_sock _rla_r_sock _rla_w_sock _data_channels 89 ## _lock_chn _mutex_n 90 91 %_valid_fields_new = map { $_ => 1 } qw( 92 max_workers tmp_dir use_threads user_tasks task_end task_name freeze thaw 93 chunk_size input_data sequence job_delay spawn_delay submit_delay RS 94 flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio 95 interval user_args user_begin user_end user_func user_error user_output 96 bounds_only gather init_relay on_post_exit on_post_run parallel_io 97 loop_timeout max_retries progress posix_exit 98 ); 99 %_params_allowed_args = map { $_ => 1 } qw( 100 chunk_size input_data sequence job_delay spawn_delay submit_delay RS 101 flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio 102 interval user_args user_begin user_end user_func user_error user_output 103 bounds_only gather init_relay on_post_exit on_post_run parallel_io 104 loop_timeout max_retries progress 105 ); 106 %_valid_fields_task = map { $_ => 1 } qw( 107 max_workers chunk_size input_data interval sequence task_end task_name 108 bounds_only gather init_relay user_args user_begin user_end user_func 109 RS parallel_io use_slurpio use_threads 110 ); 111 112 $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0; 113 $_is_winenv = ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 1 : 0; 114 115 ## Create accessor functions. 116 no strict 'refs'; no warnings 'redefine'; 117 118 for my $_p (qw( chunk_size max_retries max_workers task_name user_args )) { 119 *{ $_p } = sub () { 120 my $self = shift; $self = $MCE unless ref($self); 121 return $self->{$_p}; 122 }; 123 } 124 for my $_p (qw( chunk_id task_id task_wid wid )) { 125 *{ $_p } = sub () { 126 my $self = shift; $self = $MCE unless ref($self); 127 return $self->{"_${_p}"}; 128 }; 129 } 130 for my $_p (qw( freeze thaw )) { 131 *{ $_p } = sub () { 132 my $self = shift; $self = $MCE unless ref($self); 133 return $self->{$_p}(@_); 134 }; 135 } 136 137 $RLA = {}; 138 139 return; 140} 141 142############################################################################### 143## ---------------------------------------------------------------------------- 144## Import routine. 145## 146############################################################################### 147 148use constant { SELF => 0, CHUNK => 1, CID => 2 }; 149 150our $_MCE_LOCK : shared = 1; 151our $_WIN_LOCK : shared = 1; 152 153my ($_def, $_imported) = ({}); 154 155sub import { 156 my ($_class, $_pkg) = (shift, caller); 157 my $_p = $_def->{$_pkg} = {}; 158 159 ## Process module arguments. 160 while ( my $_argument = shift ) { 161 my $_arg = lc $_argument; 162 163 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' ); 164 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' ); 165 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' ); 166 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' ); 167 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' ); 168 169 if ( $_arg eq 'export_const' || $_arg eq 'const' ) { 170 if ( shift eq '1' ) { 171 no strict 'refs'; no warnings 'redefine'; 172 *{ $_pkg.'::SELF' } = \&SELF; 173 *{ $_pkg.'::CHUNK' } = \&CHUNK; 174 *{ $_pkg.'::CID' } = \&CID; 175 } 176 next; 177 } 178 179 ## Sereal, if available, is used automatically by MCE 1.800 onwards. 180 if ( $_arg eq 'sereal' ) { 181 if ( shift eq '0' ) { 182 require Storable; 183 $_p->{FREEZE} = \&Storable::freeze; 184 $_p->{THAW} = \&Storable::thaw; 185 } 186 next; 187 } 188 189 _croak("Error: ($_argument) invalid module option"); 190 } 191 192 return if $_imported++; 193 194 ## Instantiate a module-level instance. 195 $MCE = MCE->new( _module_instance => 1, max_workers => 0 ); 196 197 return; 198} 199 200############################################################################### 201## ---------------------------------------------------------------------------- 202## Define constants & variables. 203## 204############################################################################### 205 206use constant { 207 208 # Max data channels. This cannot be greater than 8 on MSWin32. 209 DATA_CHANNELS => ($^O eq 'MSWin32') ? 8 : 10, 210 211 # Max GC size. Undef variable when exceeding size. 212 MAX_GC_SIZE => 1024 * 1024 * 64, 213 214 MAX_RECS_SIZE => 8192, # Reads number of records if N <= value 215 # Reads number of bytes if N > value 216 217 OUTPUT_W_ABT => 'W~ABT', # Worker has aborted 218 OUTPUT_W_DNE => 'W~DNE', # Worker has completed 219 OUTPUT_W_RLA => 'W~RLA', # Worker has relayed 220 OUTPUT_W_EXT => 'W~EXT', # Worker has exited 221 OUTPUT_A_REF => 'A~REF', # Input << Array ref 222 OUTPUT_G_REF => 'G~REF', # Input << Glob ref 223 OUTPUT_H_REF => 'H~REF', # Input << Hash ref 224 OUTPUT_I_REF => 'I~REF', # Input << Iter ref 225 OUTPUT_A_CBK => 'A~CBK', # Callback w/ multiple args 226 OUTPUT_N_CBK => 'N~CBK', # Callback w/ no args 227 OUTPUT_A_GTR => 'A~GTR', # Gather data 228 OUTPUT_O_SND => 'O~SND', # Send >> STDOUT 229 OUTPUT_E_SND => 'E~SND', # Send >> STDERR 230 OUTPUT_F_SND => 'F~SND', # Send >> File 231 OUTPUT_D_SND => 'D~SND', # Send >> File descriptor 232 OUTPUT_B_SYN => 'B~SYN', # Barrier sync - begin 233 OUTPUT_E_SYN => 'E~SYN', # Barrier sync - end 234 OUTPUT_S_IPC => 'S~IPC', # Change to win32 IPC 235 OUTPUT_C_NFY => 'C~NFY', # Chunk ID notification 236 OUTPUT_P_NFY => 'P~NFY', # Progress notification 237 OUTPUT_R_NFY => 'R~NFY', # Relay notification 238 OUTPUT_S_DIR => 'S~DIR', # Make/get sess_dir 239 OUTPUT_T_DIR => 'T~DIR', # Make/get tmp_dir 240 OUTPUT_I_DLY => 'I~DLY', # Interval delay 241 242 READ_FILE => 0, # Worker reads file handle 243 READ_MEMORY => 1, # Worker reads memory handle 244 245 REQUEST_ARRAY => 0, # Worker requests next array chunk 246 REQUEST_GLOB => 1, # Worker requests next glob chunk 247 REQUEST_HASH => 2, # Worker requests next hash chunk 248 249 SENDTO_FILEV1 => 0, # Worker sends to 'file', $a, '/path' 250 SENDTO_FILEV2 => 1, # Worker sends to 'file:/path', $a 251 SENDTO_STDOUT => 2, # Worker sends to STDOUT 252 SENDTO_STDERR => 3, # Worker sends to STDERR 253 SENDTO_FD => 4, # Worker sends to file descriptor 254 255 WANTS_UNDEF => 0, # Callee wants nothing 256 WANTS_ARRAY => 1, # Callee wants list 257 WANTS_SCALAR => 2, # Callee wants scalar 258}; 259 260my $_mce_count = 0; 261 262sub CLONE { 263 $_tid = threads->tid() if $INC{'threads.pm'}; 264} 265 266sub DESTROY { 267 CORE::kill('KILL', $$) 268 if ( $_is_MSWin32 && $MCE::Signal::KILLED ); 269 270 $_[0]->shutdown(1) 271 if ( $_[0] && $_[0]->{_spawned} && $_[0]->{_init_pid} eq "$$.$_tid" && 272 !$MCE::Signal::KILLED ); 273 274 return; 275} 276 277END { 278 return unless ( defined $MCE ); 279 280 my $_pid = $MCE->{_is_thread} ? $$ .'.'. threads->tid() : $$; 281 $MCE->exit if ( exists $MCE->{_wuf} && $MCE->{_pid} eq $_pid ); 282 283 _end(); 284} 285 286sub _end { 287 MCE::Flow->finish ( 'MCE' ) if $INC{'MCE/Flow.pm'}; 288 MCE::Grep->finish ( 'MCE' ) if $INC{'MCE/Grep.pm'}; 289 MCE::Loop->finish ( 'MCE' ) if $INC{'MCE/Loop.pm'}; 290 MCE::Map->finish ( 'MCE' ) if $INC{'MCE/Map.pm'}; 291 MCE::Step->finish ( 'MCE' ) if $INC{'MCE/Step.pm'}; 292 MCE::Stream->finish ( 'MCE' ) if $INC{'MCE/Stream.pm'}; 293 294 $MCE = $TOP_HDLR = undef; 295} 296 297############################################################################### 298## ---------------------------------------------------------------------------- 299## Plugin interface for external modules plugging into MCE, e.g. MCE::Queue. 300## 301############################################################################### 302 303my (%_plugin_function, @_plugin_loop_begin, @_plugin_loop_end); 304my (%_plugin_list, @_plugin_worker_init); 305 306sub _attach_plugin { 307 my $_ext_module = caller; 308 309 unless (exists $_plugin_list{$_ext_module}) { 310 $_plugin_list{$_ext_module} = undef; 311 312 my $_ext_output_function = $_[0]; 313 my $_ext_output_loop_begin = $_[1]; 314 my $_ext_output_loop_end = $_[2]; 315 my $_ext_worker_init = $_[3]; 316 317 if (ref $_ext_output_function eq 'HASH') { 318 for my $_p (keys %{ $_ext_output_function }) { 319 $_plugin_function{$_p} = $_ext_output_function->{$_p} 320 unless (exists $_plugin_function{$_p}); 321 } 322 } 323 324 push @_plugin_loop_begin, $_ext_output_loop_begin 325 if (ref $_ext_output_loop_begin eq 'CODE'); 326 push @_plugin_loop_end, $_ext_output_loop_end 327 if (ref $_ext_output_loop_end eq 'CODE'); 328 push @_plugin_worker_init, $_ext_worker_init 329 if (ref $_ext_worker_init eq 'CODE'); 330 } 331 332 @_ = (); 333 334 return; 335} 336 337## Functions for saving and restoring $MCE. 338## Called by MCE::{ Flow, Grep, Loop, Map, Step, and Stream }. 339 340sub _save_state { 341 $_prev_mce = $MCE; $MCE = $_[0]; 342 return; 343} 344sub _restore_state { 345 $_prev_mce->{_wrk_status} = $MCE->{_wrk_status}; 346 $MCE = $_prev_mce; $_prev_mce = undef; 347 return; 348} 349 350############################################################################### 351## ---------------------------------------------------------------------------- 352## New instance instantiation. 353## 354############################################################################### 355 356sub _croak { 357 if (MCE->wid == 0 || ! $^S) { 358 $SIG{__DIE__} = \&MCE::Signal::_die_handler; 359 $SIG{__WARN__} = \&MCE::Signal::_warn_handler; 360 } 361 $\ = undef; goto &Carp::croak; 362} 363 364use MCE::Core::Validation (); 365use MCE::Core::Manager (); 366use MCE::Core::Worker (); 367 368sub new { 369 my ($class, %self) = @_; 370 my $_pkg = exists $self{pkg} ? delete $self{pkg} : caller; 371 372 @_ = (); 373 374 bless(\%self, ref($class) || $class); 375 376 $self{task_name} ||= 'MCE'; 377 $self{max_workers} ||= $_def->{$_pkg}{MAX_WORKERS} || 1; 378 $self{chunk_size} ||= $_def->{$_pkg}{CHUNK_SIZE} || 1; 379 $self{tmp_dir} ||= $_def->{$_pkg}{TMP_DIR} || $MCE::Signal::tmp_dir; 380 $self{freeze} ||= $_def->{$_pkg}{FREEZE} || $_freeze; 381 $self{thaw} ||= $_def->{$_pkg}{THAW} || $_thaw; 382 383 if (exists $self{_module_instance}) { 384 $self{_init_total_workers} = $self{max_workers}; 385 $self{_chunk_id} = $self{_task_wid} = $self{_wrk_status} = 0; 386 $self{_spawned} = $self{_task_id} = $self{_wid} = 0; 387 $self{_init_pid} = "$$.$_tid"; 388 389 return \%self; 390 } 391 392 _sendto_fhs_close(); 393 394 for my $_p (keys %self) { 395 _croak("MCE::new: ($_p) is not a valid constructor argument") 396 unless (exists $_valid_fields_new{$_p}); 397 } 398 399 $self{_caller} = $_pkg, $self{_init_pid} = "$$.$_tid"; 400 401 if (defined $self{use_threads}) { 402 if (!$_has_threads && $self{use_threads}) { 403 my $_msg = "\n"; 404 $_msg .= "## Please include threads support prior to loading MCE\n"; 405 $_msg .= "## when specifying use_threads => $self{use_threads}\n"; 406 $_msg .= "\n"; 407 408 _croak($_msg); 409 } 410 } 411 else { 412 $self{use_threads} = ($_has_threads) ? 1 : 0; 413 } 414 415 if (!exists $self{posix_exit}) { 416 $self{posix_exit} = 1 if ( 417 $^S || $_tid || $INC{'Mojo/IOLoop.pm'} || 418 $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} || 419 $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} || 420 $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} || 421 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'} 422 ); 423 } 424 425 ## ------------------------------------------------------------------------- 426 ## Validation. 427 428 if (defined $self{tmp_dir}) { 429 _croak("MCE::new: ($self{tmp_dir}) is not a directory or does not exist") 430 unless (-d $self{tmp_dir}); 431 _croak("MCE::new: ($self{tmp_dir}) is not writeable") 432 unless (-w $self{tmp_dir}); 433 } 434 435 if (defined $self{user_tasks}) { 436 _croak('MCE::new: (user_tasks) is not an ARRAY reference') 437 unless (ref $self{user_tasks} eq 'ARRAY'); 438 439 $self{max_workers} = _parse_max_workers($self{max_workers}); 440 $self{init_relay} = $self{user_tasks}->[0]->{init_relay} 441 if ($self{user_tasks}->[0]->{init_relay}); 442 443 for my $_task (@{ $self{user_tasks} }) { 444 for my $_p (keys %{ $_task }) { 445 _croak("MCE::new: ($_p) is not a valid task constructor argument") 446 unless (exists $_valid_fields_task{$_p}); 447 } 448 $_task->{max_workers} = 0 unless scalar(keys %{ $_task }); 449 450 $_task->{max_workers} = $self{max_workers} 451 unless (defined $_task->{max_workers}); 452 $_task->{use_threads} = $self{use_threads} 453 unless (defined $_task->{use_threads}); 454 455 bless($_task, ref(\%self) || \%self); 456 } 457 } 458 459 _validate_args(\%self); 460 461 ## ------------------------------------------------------------------------- 462 ## Private options. Limit chunk_size. 463 464 my $_run_lock; 465 466 $self{_chunk_id} = 0; # Chunk ID 467 $self{_send_cnt} = 0; # Number of times data was sent via send 468 $self{_spawned} = 0; # Have workers been spawned 469 $self{_task_id} = 0; # Task ID, starts at 0 (array index) 470 $self{_task_wid} = 0; # Task Worker ID, starts at 1 per task 471 $self{_wid} = 0; # Worker ID, starts at 1 per MCE instance 472 $self{_wrk_status} = 0; # For saving exit status when worker exits 473 474 $self{_run_lock} = threads::shared::share($_run_lock) if $_is_MSWin32; 475 476 $self{_last_sref} = (ref $self{input_data} eq 'SCALAR') 477 ? refaddr($self{input_data}) : 0; 478 479 my $_data_channels = ("$$.$_tid" eq $_oid) 480 ? ( $INC{'MCE/Channel.pm'} ? 6 : DATA_CHANNELS ) 481 : 2; 482 483 my $_total_workers = 0; 484 485 if (defined $self{user_tasks}) { 486 $_total_workers += $_->{max_workers} for @{ $self{user_tasks} }; 487 } else { 488 $_total_workers = $self{max_workers}; 489 } 490 491 $self{_init_total_workers} = $_total_workers; 492 493 $self{_data_channels} = ($_total_workers < $_data_channels) 494 ? $_total_workers : $_data_channels; 495 496 $self{_lock_chn} = ($_total_workers > $_data_channels) ? 1 : 0; 497 $self{_lock_chn} = 1 if $INC{'MCE/Child.pm'} || $INC{'MCE/Hobo.pm'}; 498 499 $MCE = \%self if ($MCE->{_wid} == 0); 500 501 return \%self; 502} 503 504############################################################################### 505## ---------------------------------------------------------------------------- 506## Spawn method. 507## 508############################################################################### 509 510sub spawn { 511 my $self = shift; $self = $MCE unless ref($self); 512 513 local $_; @_ = (); 514 515 _croak('MCE::spawn: method is not allowed by the worker process') 516 if ($self->{_wid}); 517 518 ## Return if workers have already been spawned or if module instance. 519 return $self if ($self->{_spawned} || exists $self->{_module_instance}); 520 521 lock $_WIN_LOCK if $_is_MSWin32; # Obtain locks 522 lock $_MCE_LOCK if $_has_threads && $_is_winenv; 523 524 $MCE::_GMUTEX->lock() if ($_tid && $MCE::_GMUTEX); 525 sleep 0.015 if $_tid; 526 527 _sendto_fhs_close(); 528 529 if ($INC{'PDL.pm'}) { local $@; 530 eval 'use PDL::IO::Storable' unless $INC{'PDL/IO/Storable.pm'}; 531 eval 'PDL::no_clone_skip_warning()'; 532 } 533 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) { 534 local $@; eval 'require Net::HTTP; require Net::HTTPS'; 535 } 536 537 ## Start the shared-manager process if not running. 538 MCE::Shared->start() if $INC{'MCE/Shared.pm'}; 539 540 ## Load input module. 541 if (defined $self->{sequence}) { 542 require MCE::Core::Input::Sequence 543 unless $INC{'MCE/Core/Input/Sequence.pm'}; 544 } 545 elsif (defined $self->{input_data}) { 546 my $_ref = ref $self->{input_data}; 547 if ($_ref =~ /^(?:ARRAY|HASH|GLOB|FileHandle|IO::)/) { 548 require MCE::Core::Input::Request 549 unless $INC{'MCE/Core/Input/Request.pm'}; 550 } 551 elsif ($_ref eq 'CODE') { 552 require MCE::Core::Input::Iterator 553 unless $INC{'MCE/Core/Input/Iterator.pm'}; 554 } 555 else { 556 require MCE::Core::Input::Handle 557 unless $INC{'MCE/Core/Input/Handle.pm'}; 558 } 559 } 560 561 my $_die_handler = $SIG{__DIE__}; 562 my $_warn_handler = $SIG{__WARN__}; 563 564 $SIG{__DIE__} = \&MCE::Signal::_die_handler; 565 $SIG{__WARN__} = \&MCE::Signal::_warn_handler; 566 567 if (!defined $TOP_HDLR || (!$TOP_HDLR->{_mgr_live} && !$TOP_HDLR->{_wid})) { 568 ## On Windows, must shutdown the last idle MCE session. 569 if ($_is_MSWin32 && defined $TOP_HDLR && $TOP_HDLR->{_spawned}) { 570 $TOP_HDLR->shutdown(1); 571 } 572 $TOP_HDLR = $self; 573 } 574 elsif (refaddr($self) != refaddr($TOP_HDLR)) { 575 ## Reduce the maximum number of channels for nested sessions. 576 $self->{_data_channels} = 4 if ($self->{_data_channels} > 4); 577 $self->{_lock_chn} = 1 if ($self->{_init_total_workers} > 4); 578 579 ## On Windows, instruct the manager process to enable win32 IPC. 580 if ($_is_MSWin32 && $ENV{'PERL_MCE_IPC'} ne 'win32') { 581 $ENV{'PERL_MCE_IPC'} = 'win32'; local $\ = undef; 582 my $_DAT_W_SOCK = $TOP_HDLR->{_dat_w_sock}->[0]; 583 print {$_DAT_W_SOCK} OUTPUT_S_IPC.$LF . '0'.$LF; 584 585 MCE::Util::_sock_ready($_DAT_W_SOCK, -1); 586 MCE::Util::_sysread($_DAT_W_SOCK, my($_buf), 1); 587 } 588 } 589 590 ## ------------------------------------------------------------------------- 591 592 my $_data_channels = $self->{_data_channels}; 593 my $_max_workers = _get_max_workers($self); 594 my $_use_threads = $self->{use_threads}; 595 596 ## Create locks for data channels. 597 $self->{'_mutex_0'} = MCE::Mutex->new( impl => 'Channel' ); 598 599 if ($self->{_lock_chn}) { 600 $self->{'_mutex_'.$_} = MCE::Mutex->new( impl => 'Channel' ) 601 for (1 .. $_data_channels); 602 } 603 604 ## Create sockets for IPC. sync, comm, data 605 MCE::Util::_sock_pair($self, qw(_bsb_r_sock _bsb_w_sock), undef, 1); 606 MCE::Util::_sock_pair($self, qw(_com_r_sock _com_w_sock), undef, 1); 607 608 MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), 0); 609 MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), $_, 1) 610 for (1 .. $_data_channels); 611 612 setsockopt($self->{_dat_r_sock}->[0], SOL_SOCKET, SO_RCVBUF, pack('i', 4096)) 613 if ($^O ne 'aix' && $^O ne 'linux'); 614 615 ($_is_MSWin32) # input 616 ? MCE::Util::_pipe_pair($self, qw(_que_r_sock _que_w_sock)) 617 : MCE::Util::_sock_pair($self, qw(_que_r_sock _que_w_sock), undef, 1); 618 619 if (defined $self->{init_relay}) { # relay 620 unless ($INC{'MCE/Relay.pm'}) { 621 require MCE::Relay; MCE::Relay->import(); 622 } 623 MCE::Util::_sock_pair($self, qw(_rla_r_sock _rla_w_sock), $_, 1) 624 for (0 .. $_max_workers - 1); 625 } 626 627 $self->{_seed} = int(rand() * 1e9); 628 629 ## ------------------------------------------------------------------------- 630 631 ## Spawn workers. 632 $self->{_pids} = [], $self->{_thrs} = [], $self->{_tids} = []; 633 $self->{_status} = [], $self->{_state} = [], $self->{_task} = []; 634 635 if ($self->{loop_timeout} && !$_is_MSWin32) { 636 $self->{_pids_t} = {}, $self->{_pids_w} = {}; 637 } 638 639 local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH} unless $_is_MSWin32; 640 641 if (!defined $self->{user_tasks}) { 642 $self->{_total_workers} = $_max_workers; 643 644 if (defined $_use_threads && $_use_threads == 1) { 645 _dispatch_thread($self, $_) for (1 .. $_max_workers); 646 } else { 647 _dispatch_child($self, $_) for (1 .. $_max_workers); 648 } 649 650 $self->{_task}->[0] = { _total_workers => $_max_workers }; 651 652 for my $_i (1 .. $_max_workers) { 653 $self->{_state}->[$_i] = { 654 _task => undef, _task_id => undef, _task_wid => undef, 655 _params => undef, _chn => $_i % $_data_channels + 1 656 } 657 } 658 } 659 else { 660 my ($_task_id, $_wid); 661 662 $self->{_total_workers} = 0; 663 $self->{_total_workers} += $_->{max_workers} for @{ $self->{user_tasks} }; 664 665 # Must spawn processes first for extra stability on BSD/Darwin. 666 $_task_id = $_wid = 0; 667 668 for my $_task (@{ $self->{user_tasks} }) { 669 my $_tsk_use_threads = $_task->{use_threads}; 670 671 if (defined $_tsk_use_threads && $_tsk_use_threads == 1) { 672 $_wid += $_task->{max_workers}; 673 } else { 674 _dispatch_child($self, ++$_wid, $_task, $_task_id, $_) 675 for (1 .. $_task->{max_workers}); 676 } 677 678 $_task_id++; 679 } 680 681 # Then, spawn threads last. 682 $_task_id = $_wid = 0; 683 684 for my $_task (@{ $self->{user_tasks} }) { 685 my $_tsk_use_threads = $_task->{use_threads}; 686 687 if (defined $_tsk_use_threads && $_tsk_use_threads == 1) { 688 _dispatch_thread($self, ++$_wid, $_task, $_task_id, $_) 689 for (1 .. $_task->{max_workers}); 690 } else { 691 $_wid += $_task->{max_workers}; 692 } 693 694 $_task_id++; 695 } 696 697 # Save state. 698 $_task_id = $_wid = 0; 699 700 for my $_task (@{ $self->{user_tasks} }) { 701 $self->{_task}->[$_task_id] = { 702 _total_running => 0, _total_workers => $_task->{max_workers} 703 }; 704 for my $_i (1 .. $_task->{max_workers}) { 705 $_wid += 1; 706 $self->{_state}->[$_wid] = { 707 _task => $_task, _task_id => $_task_id, _task_wid => $_i, 708 _params => undef, _chn => $_wid % $_data_channels + 1 709 } 710 } 711 712 $_task_id++; 713 } 714 } 715 716 ## ------------------------------------------------------------------------- 717 718 $self->{_send_cnt} = 0, $self->{_spawned} = 1; 719 720 $SIG{__DIE__} = $_die_handler; 721 $SIG{__WARN__} = $_warn_handler; 722 723 $MCE = $self if ($MCE->{_wid} == 0); 724 725 $MCE::_GMUTEX->unlock() if ($_tid && $MCE::_GMUTEX); 726 727 return $self; 728} 729 730############################################################################### 731## ---------------------------------------------------------------------------- 732## Process method, relay stubs, and AUTOLOAD for methods not used often. 733## 734############################################################################### 735 736sub process { 737 my $self = shift; $self = $MCE unless ref($self); 738 739 _validate_runstate($self, 'MCE::process'); 740 741 my ($_params_ref, $_input_data); 742 743 if (ref $_[0] eq 'HASH' && ref $_[1] eq 'HASH') { 744 $_params_ref = $_[0], $_input_data = $_[1]; 745 } elsif (ref $_[0] eq 'HASH') { 746 $_params_ref = $_[0], $_input_data = $_[1]; 747 } else { 748 $_params_ref = $_[1], $_input_data = $_[0]; 749 } 750 751 @_ = (); 752 753 ## Set input data. 754 if (defined $_input_data) { 755 $_params_ref->{input_data} = $_input_data; 756 } 757 elsif ( !defined $_params_ref->{input_data} && 758 !defined $_params_ref->{sequence} ) { 759 _croak('MCE::process: (input_data or sequence) is not specified'); 760 } 761 762 ## Pass 0 to "not" auto-shutdown after processing. 763 $self->run(0, $_params_ref); 764 765 return $self; 766} 767 768sub relay (;&) { 769 _croak('MCE::relay: (init_relay) is not specified') 770 unless (defined $MCE->{init_relay}); 771} 772 773{ 774 no warnings 'once'; 775 *relay_unlock = \&relay; 776} 777 778sub AUTOLOAD { 779 # $AUTOLOAD = MCE::<method_name> 780 781 my $_fcn = substr($MCE::AUTOLOAD, 5); 782 my $self = shift; $self = $MCE unless ref($self); 783 784 # "for" sugar methods 785 786 if ($_fcn eq 'forchunk') { 787 require MCE::Candy unless $INC{'MCE/Candy.pm'}; 788 return MCE::Candy::forchunk($self, @_); 789 } 790 elsif ($_fcn eq 'foreach') { 791 require MCE::Candy unless $INC{'MCE/Candy.pm'}; 792 return MCE::Candy::foreach($self, @_); 793 } 794 elsif ($_fcn eq 'forseq') { 795 require MCE::Candy unless $INC{'MCE/Candy.pm'}; 796 return MCE::Candy::forseq($self, @_); 797 } 798 799 # relay stubs for MCE::Relay 800 801 if ($_fcn eq 'relay_lock' || $_fcn eq 'relay_recv') { 802 _croak('MCE::relay: (init_relay) is not specified') 803 unless (defined $MCE->{init_relay}); 804 } 805 elsif ($_fcn eq 'relay_final') { 806 return; 807 } 808 809 # worker immediately exits the chunking loop 810 811 if ($_fcn eq 'last') { 812 _croak('MCE::last: method is not allowed by the manager process') 813 unless ($self->{_wid}); 814 815 $self->{_last_jmp}() if (defined $self->{_last_jmp}); 816 817 return; 818 } 819 820 # worker starts the next iteration of the chunking loop 821 822 elsif ($_fcn eq 'next') { 823 _croak('MCE::next: method is not allowed by the manager process') 824 unless ($self->{_wid}); 825 826 $self->{_next_jmp}() if (defined $self->{_next_jmp}); 827 828 return; 829 } 830 831 # return the process ID, include thread ID for threads 832 833 elsif ($_fcn eq 'pid') { 834 if (defined $self->{_pid}) { 835 return $self->{_pid}; 836 } elsif ($_has_threads && $self->{use_threads}) { 837 return $$ .'.'. threads->tid(); 838 } 839 return $$; 840 } 841 842 # return the exit status 843 # _wrk_status holds the greatest exit status among workers exiting 844 845 elsif ($_fcn eq 'status') { 846 _croak('MCE::status: method is not allowed by the worker process') 847 if ($self->{_wid}); 848 849 return (defined $self->{_wrk_status}) ? $self->{_wrk_status} : 0; 850 } 851 852 _croak("Can't locate object method \"$_fcn\" via package \"MCE\""); 853} 854 855############################################################################### 856## ---------------------------------------------------------------------------- 857## Restart worker method. 858## 859############################################################################### 860 861sub restart_worker { 862 my $self = shift; $self = $MCE unless ref($self); 863 864 @_ = (); 865 866 _croak('MCE::restart_worker: method is not allowed by the worker process') 867 if ($self->{_wid}); 868 869 my $_wid = $self->{_exited_wid}; 870 871 my $_params = $self->{_state}->[$_wid]->{_params}; 872 my $_task_wid = $self->{_state}->[$_wid]->{_task_wid}; 873 my $_task_id = $self->{_state}->[$_wid]->{_task_id}; 874 my $_task = $self->{_state}->[$_wid]->{_task}; 875 my $_chn = $self->{_state}->[$_wid]->{_chn}; 876 877 $_params->{_chn} = $_chn; 878 879 my $_use_threads = (defined $_task_id) 880 ? $_task->{use_threads} : $self->{use_threads}; 881 882 $self->{_task}->[$_task_id]->{_total_running} += 1 if (defined $_task_id); 883 $self->{_task}->[$_task_id]->{_total_workers} += 1 if (defined $_task_id); 884 885 $self->{_total_running} += 1; 886 $self->{_total_workers} += 1; 887 888 if (defined $_use_threads && $_use_threads == 1) { 889 _dispatch_thread($self, $_wid, $_task, $_task_id, $_task_wid, $_params); 890 } else { 891 _dispatch_child($self, $_wid, $_task, $_task_id, $_task_wid, $_params); 892 } 893 894 delete $self->{_retry_cnt}; 895 896 if (defined $self->{spawn_delay} && $self->{spawn_delay} > 0.0) { 897 sleep $self->{spawn_delay}; 898 } elsif ($_tid || $_is_MSWin32) { 899 sleep 0.045; 900 } 901 902 return; 903} 904 905############################################################################### 906## ---------------------------------------------------------------------------- 907## Run method. 908## 909############################################################################### 910 911sub run { 912 my $self = shift; $self = $MCE unless ref($self); 913 914 _croak('MCE::run: method is not allowed by the worker process') 915 if ($self->{_wid}); 916 917 my ($_auto_shutdown, $_params_ref); 918 919 if (ref $_[0] eq 'HASH') { 920 $_auto_shutdown = (defined $_[1]) ? $_[1] : 1; 921 $_params_ref = $_[0]; 922 } else { 923 $_auto_shutdown = (defined $_[0]) ? $_[0] : 1; 924 $_params_ref = $_[1]; 925 } 926 927 @_ = (); 928 929 my $_has_user_tasks = (defined $self->{user_tasks}) ? 1 : 0; 930 my $_requires_shutdown = 0; 931 932 ## Unset params if workers have already been sent user_data via send. 933 ## Set user_func to NOOP if not specified. 934 935 $_params_ref = undef if ($self->{_send_cnt}); 936 937 if (!defined $self->{user_func} && !defined $_params_ref->{user_func}) { 938 $self->{user_func} = \&MCE::Signal::_NOOP; 939 } 940 941 ## Set user specified params if specified. 942 ## Shutdown workers if determined by _sync_params or if processing a 943 ## scalar reference. Workers need to be restarted in order to pick up 944 ## on the new code or scalar reference. 945 946 if (defined $_params_ref && ref $_params_ref eq 'HASH') { 947 $_requires_shutdown = _sync_params($self, $_params_ref); 948 _validate_args($self); 949 } 950 if ($_has_user_tasks) { 951 $self->{input_data} = $self->{user_tasks}->[0]->{input_data} 952 if ($self->{user_tasks}->[0]->{input_data}); 953 $self->{use_slurpio} = $self->{user_tasks}->[0]->{use_slurpio} 954 if ($self->{user_tasks}->[0]->{use_slurpio}); 955 $self->{parallel_io} = $self->{user_tasks}->[0]->{parallel_io} 956 if ($self->{user_tasks}->[0]->{parallel_io}); 957 $self->{RS} = $self->{user_tasks}->[0]->{RS} 958 if ($self->{user_tasks}->[0]->{RS}); 959 } 960 if (ref $self->{input_data} eq 'SCALAR') { 961 if (refaddr($self->{input_data}) != $self->{_last_sref}) { 962 $_requires_shutdown = 1; 963 } 964 $self->{_last_sref} = refaddr($self->{input_data}); 965 } 966 967 $self->shutdown() if ($_requires_shutdown); 968 969 ## ------------------------------------------------------------------------- 970 971 $self->{_wrk_status} = 0; 972 973 ## Spawn workers. 974 $self->spawn() unless ($self->{_spawned}); 975 return $self unless ($self->{_total_workers}); 976 977 local $SIG{__DIE__} = \&MCE::Signal::_die_handler; 978 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler; 979 980 $MCE = $self if ($MCE->{_wid} == 0); 981 982 my ($_input_data, $_input_file, $_input_glob, $_seq); 983 my ($_abort_msg, $_first_msg, $_run_mode, $_single_dim); 984 my $_chunk_size = $self->{chunk_size}; 985 986 $_seq = ($_has_user_tasks && $self->{user_tasks}->[0]->{sequence}) 987 ? $self->{user_tasks}->[0]->{sequence} 988 : $self->{sequence}; 989 990 ## Determine run mode for workers. 991 if (defined $_seq) { 992 my ($_begin, $_end, $_step) = (ref $_seq eq 'ARRAY') 993 ? @{ $_seq } : ($_seq->{begin}, $_seq->{end}, $_seq->{step}); 994 995 $_chunk_size = $self->{user_tasks}->[0]->{chunk_size} 996 if ($_has_user_tasks && $self->{user_tasks}->[0]->{chunk_size}); 997 998 $_run_mode = 'sequence'; 999 $_abort_msg = int(($_end - $_begin) / $_step / $_chunk_size); # + 1; 1000 1001 # Previously + 1 above. Below, support for large numbers, 1e16 and beyond. 1002 # E.g. sequence => [ 1, 1e16 ], chunk_size => 1e11 1003 # 1004 # Perl: int((1e15 - 1) / 1 / 1e11) = 9999 1005 # Perl: int((1e16 - 1) / 1 / 1e11) = 100000 wrong, due to precision limit 1006 # Calc: int((1e16 - 1) / 1 / 1e11) = 99999 1007 1008 if ( $_step > 0 ) { 1009 $_abort_msg++ 1010 if ($_abort_msg * $_chunk_size * abs($_step) + $_begin <= $_end); 1011 } else { 1012 $_abort_msg++ 1013 if ($_abort_msg * $_chunk_size * abs($_step) + $_end <= $_begin); 1014 } 1015 1016 $_first_msg = 0; 1017 } 1018 elsif (defined $self->{input_data}) { 1019 my $_ref = ref $self->{input_data}; 1020 1021 if ($_ref eq '') { # File mode 1022 $_run_mode = 'file'; 1023 $_input_file = $self->{input_data}; 1024 $_input_data = $_input_glob = undef; 1025 $_abort_msg = (-s $_input_file) + 1; 1026 $_first_msg = 0; ## Begin at offset position 1027 1028 if ((-s $_input_file) == 0) { 1029 $self->shutdown() if ($_auto_shutdown == 1); 1030 return $self; 1031 } 1032 } 1033 elsif ($_ref eq 'ARRAY') { # Array mode 1034 $_run_mode = 'array'; 1035 $_input_data = $self->{input_data}; 1036 $_input_file = $_input_glob = undef; 1037 $_single_dim = 1 if (ref $_input_data->[0] eq ''); 1038 $_abort_msg = 0; ## Flag: Has Data: No 1039 $_first_msg = 1; ## Flag: Has Data: Yes 1040 1041 if (@{ $_input_data } == 0) { 1042 $self->shutdown() if ($_auto_shutdown == 1); 1043 return $self; 1044 } 1045 } 1046 elsif ($_ref eq 'HASH') { # Hash mode 1047 $_run_mode = 'hash'; 1048 $_input_data = $self->{input_data}; 1049 $_input_file = $_input_glob = undef; 1050 $_abort_msg = 0; ## Flag: Has Data: No 1051 $_first_msg = 1; ## Flag: Has Data: Yes 1052 1053 if (scalar( keys %{ $_input_data } ) == 0) { 1054 $self->shutdown() if ($_auto_shutdown == 1); 1055 return $self; 1056 } 1057 } 1058 elsif ($_ref =~ /^(?:GLOB|FileHandle|IO::)/) { # Glob mode 1059 $_run_mode = 'glob'; 1060 $_input_glob = $self->{input_data}; 1061 $_input_data = $_input_file = undef; 1062 $_abort_msg = 0; ## Flag: Has Data: No 1063 $_first_msg = 1; ## Flag: Has Data: Yes 1064 } 1065 elsif ($_ref eq 'CODE') { # Iterator mode 1066 $_run_mode = 'iterator'; 1067 $_input_data = $self->{input_data}; 1068 $_input_file = $_input_glob = undef; 1069 $_abort_msg = 0; ## Flag: Has Data: No 1070 $_first_msg = 1; ## Flag: Has Data: Yes 1071 } 1072 elsif ($_ref eq 'SCALAR') { # Memory mode 1073 $_run_mode = 'memory'; 1074 $_input_data = $_input_file = $_input_glob = undef; 1075 $_abort_msg = length(${ $self->{input_data} }) + 1; 1076 $_first_msg = 0; ## Begin at offset position 1077 1078 if (length(${ $self->{input_data} }) == 0) { 1079 return $self->shutdown() if ($_auto_shutdown == 1); 1080 } 1081 } 1082 else { 1083 _croak('MCE::run: (input_data) is not valid'); 1084 } 1085 } 1086 else { # Nodata mode 1087 $_abort_msg = undef, $_run_mode = 'nodata'; 1088 } 1089 1090 ## ------------------------------------------------------------------------- 1091 1092 my $_total_workers = $self->{_total_workers}; 1093 my $_send_cnt = $self->{_send_cnt}; 1094 1095 if ($_send_cnt) { 1096 $self->{_total_running} = $_send_cnt; 1097 $self->{_task}->[0]->{_total_running} = $_send_cnt; 1098 } 1099 else { 1100 $self->{_total_running} = $_total_workers; 1101 1102 my ($_frozen_nodata, $_wid, %_params_nodata, %_task0_wids); 1103 my $_COM_R_SOCK = $self->{_com_r_sock}; 1104 my $_submit_delay = $self->{submit_delay}; 1105 1106 my %_params = ( 1107 '_abort_msg' => $_abort_msg, '_chunk_size' => $_chunk_size, 1108 '_input_file' => $_input_file, '_run_mode' => $_run_mode, 1109 '_bounds_only' => $self->{bounds_only}, 1110 '_max_retries' => $self->{max_retries}, 1111 '_parallel_io' => $self->{parallel_io}, 1112 '_progress' => $self->{progress} ? 1 : 0, 1113 '_sequence' => $self->{sequence}, 1114 '_user_args' => $self->{user_args}, 1115 '_use_slurpio' => $self->{use_slurpio}, 1116 '_RS' => $self->{RS} 1117 ); 1118 1119 my $_frozen_params = $self->{freeze}(\%_params); 1120 $_frozen_params = length($_frozen_params).$LF . $_frozen_params; 1121 1122 if ($_has_user_tasks) { 1123 %_params_nodata = ( %_params, 1124 '_abort_msg' => undef, '_run_mode' => 'nodata' 1125 ); 1126 $_frozen_nodata = $self->{freeze}(\%_params_nodata); 1127 $_frozen_nodata = length($_frozen_nodata).$LF . $_frozen_nodata; 1128 1129 for my $_t (@{ $self->{_task} }) { 1130 $_t->{_total_running} = $_t->{_total_workers}; 1131 } 1132 for my $_i (1 .. @{ $self->{_state} } - 1) { 1133 $_task0_wids{$_i} = undef unless ($self->{_state}[$_i]{_task_id}); 1134 } 1135 } 1136 1137 local $\ = undef; local $/ = $LF; 1138 1139 ## Insert the first message into the queue if defined. 1140 if (defined $_first_msg) { 1141 syswrite($self->{_que_w_sock}, pack($_que_template, 0, $_first_msg)); 1142 } 1143 1144 ## Submit params data to workers. 1145 for my $_i (1 .. $_total_workers) { 1146 print({$_COM_R_SOCK} $_i.$LF), chomp($_wid = <$_COM_R_SOCK>); 1147 1148 if (!$_has_user_tasks || exists $_task0_wids{$_wid}) { 1149 print({$_COM_R_SOCK} $_frozen_params), <$_COM_R_SOCK>; 1150 $self->{_state}[$_wid]{_params} = \%_params; 1151 } else { 1152 print({$_COM_R_SOCK} $_frozen_nodata), <$_COM_R_SOCK>; 1153 $self->{_state}[$_wid]{_params} = \%_params_nodata; 1154 } 1155 1156 sleep $_submit_delay 1157 if defined($_submit_delay) && $_submit_delay > 0.0; 1158 } 1159 } 1160 1161 ## ------------------------------------------------------------------------- 1162 1163 $self->{_total_exited} = 0; 1164 1165 ## Call the output function. 1166 if ($self->{_total_running} > 0) { 1167 $self->{_mgr_live} = 1; 1168 $self->{_abort_msg} = $_abort_msg; 1169 $self->{_single_dim} = $_single_dim; 1170 1171 lock $self->{_run_lock} if $_is_MSWin32; 1172 1173 if (!$_send_cnt) { 1174 ## Notify workers to commence processing. 1175 if ($_is_MSWin32) { 1176 my $_buf = _sprintf("%${_total_workers}s", ""); 1177 syswrite($self->{_bsb_r_sock}, $_buf); 1178 } else { 1179 my $_BSB_R_SOCK = $self->{_bsb_r_sock}; 1180 for my $_i (1 .. $_total_workers) { 1181 syswrite($_BSB_R_SOCK, $LF); 1182 } 1183 } 1184 } 1185 1186 _output_loop( $self, $_input_data, $_input_glob, 1187 \%_plugin_function, \@_plugin_loop_begin, \@_plugin_loop_end 1188 ); 1189 1190 $self->{_mgr_live} = $self->{_abort_msg} = $self->{_single_dim} = undef; 1191 } 1192 1193 ## Remove the last message from the queue. 1194 if (!$_send_cnt && $_run_mode ne 'nodata') { 1195 MCE::Util::_sysread($self->{_que_r_sock}, my($_buf), $_que_read_size) 1196 if ( defined $self->{_que_r_sock} ); 1197 } 1198 1199 $self->{_send_cnt} = 0; 1200 1201 ## Shutdown workers. 1202 if ($_auto_shutdown || $self->{_total_exited}) { 1203 $self->shutdown(); 1204 } 1205 elsif ($^S || $ENV{'PERL_IPERL_RUNNING'}) { 1206 if ( 1207 !$INC{'Mojo/IOLoop.pm'} && !$INC{'Win32/GUI.pm'} && 1208 !$INC{'Gearman/XS.pm'} && !$INC{'Gearman/Util.pm'} && 1209 !$INC{'Tk.pm'} && !$INC{'Wx.pm'} 1210 ) { 1211 # running inside eval or IPerl, check stack trace 1212 my $_t = Carp::longmess(); $_t =~ s/\teval [^\n]+\n$//; 1213 1214 if ( $_t =~ /^(?:[^\n]+\n){1,7}\teval / || 1215 $_t =~ /\n\teval [^\n]+\n\t(?:eval|Try)/ || 1216 $_t =~ /\n\tMCE::_dispatch\(\) [^\n]+ thread \d+\n$/ || 1217 ( $_tid && !$self->{use_threads} ) ) 1218 { 1219 $self->shutdown(); 1220 } 1221 } 1222 } 1223 1224 return $self; 1225} 1226 1227############################################################################### 1228## ---------------------------------------------------------------------------- 1229## Send method. 1230## 1231############################################################################### 1232 1233sub send { 1234 my $self = shift; $self = $MCE unless ref($self); 1235 1236 _croak('MCE::send: method is not allowed by the worker process') 1237 if ($self->{_wid}); 1238 _croak('MCE::send: method is not allowed while running') 1239 if ($self->{_total_running}); 1240 1241 _croak('MCE::send: method cannot be used with input_data or sequence') 1242 if (defined $self->{input_data} || defined $self->{sequence}); 1243 _croak('MCE::send: method cannot be used with user_tasks') 1244 if (defined $self->{user_tasks}); 1245 1246 my $_data_ref; 1247 1248 if (ref $_[0] eq 'ARRAY' || ref $_[0] eq 'HASH' || ref $_[0] eq 'PDL') { 1249 $_data_ref = $_[0]; 1250 } else { 1251 _croak('MCE::send: ARRAY, HASH, or a PDL reference is not specified'); 1252 } 1253 1254 @_ = (); 1255 1256 $self->{_send_cnt} = 0 unless (defined $self->{_send_cnt}); 1257 1258 ## ------------------------------------------------------------------------- 1259 1260 ## Spawn workers. 1261 $self->spawn() unless ($self->{_spawned}); 1262 1263 _croak('MCE::send: Sending greater than # of workers is not allowed') 1264 if ($self->{_send_cnt} >= $self->{_task}->[0]->{_total_workers}); 1265 1266 local $SIG{__DIE__} = \&MCE::Signal::_die_handler; 1267 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler; 1268 1269 ## Begin data submission. 1270 local $\ = undef; local $/ = $LF; 1271 1272 my $_COM_R_SOCK = $self->{_com_r_sock}; 1273 my $_submit_delay = $self->{submit_delay}; 1274 my $_frozen_data = $self->{freeze}($_data_ref); 1275 my $_len = length $_frozen_data; 1276 1277 ## Submit data to worker. 1278 print({$_COM_R_SOCK} '_data'.$LF), <$_COM_R_SOCK>; 1279 print({$_COM_R_SOCK} $_len.$LF, $_frozen_data), <$_COM_R_SOCK>; 1280 1281 $self->{_send_cnt} += 1; 1282 1283 sleep $_submit_delay 1284 if defined($_submit_delay) && $_submit_delay > 0.0; 1285 1286 return $self; 1287} 1288 1289############################################################################### 1290## ---------------------------------------------------------------------------- 1291## Shutdown method. 1292## 1293############################################################################### 1294 1295sub shutdown { 1296 my $self = shift; $self = $MCE unless ref($self); 1297 my $_no_lock = shift || 0; 1298 1299 @_ = (); 1300 1301 ## Return unless spawned or already shutdown. 1302 return unless $self->{_spawned}; 1303 1304 ## Return if signaled. 1305 if ($MCE::Signal::KILLED) { 1306 if (defined $self->{_sess_dir}) { 1307 my $_sess_dir = delete $self->{_sess_dir}; 1308 rmdir $_sess_dir if -d $_sess_dir; 1309 } 1310 return; 1311 } 1312 1313 _validate_runstate($self, 'MCE::shutdown'); 1314 1315 ## Complete processing before shutting down. 1316 $self->run(0) if ($self->{_send_cnt}); 1317 1318 local $SIG{__DIE__} = \&MCE::Signal::_die_handler; 1319 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler; 1320 1321 my $_COM_R_SOCK = $self->{_com_r_sock}; 1322 my $_data_channels = $self->{_data_channels}; 1323 my $_total_workers = $self->{_total_workers}; 1324 my $_sess_dir = $self->{_sess_dir}; 1325 1326 if (defined $TOP_HDLR && refaddr($self) == refaddr($TOP_HDLR)) { 1327 $TOP_HDLR = undef; 1328 } 1329 1330 ## ------------------------------------------------------------------------- 1331 1332 lock $_MCE_LOCK if ($_has_threads && $_is_winenv && !$_no_lock); 1333 1334 ## Notify workers to exit loop. 1335 local ($!, $?, $_); local $\ = undef; local $/ = $LF; 1336 1337 for (1 .. $_total_workers) { 1338 print({$_COM_R_SOCK} '_exit'.$LF), <$_COM_R_SOCK>; 1339 } 1340 1341 ## Reap children and/or threads. 1342 if (@{ $self->{_pids} } > 0) { 1343 my $_list = $self->{_pids}; 1344 for my $i (0 .. @{ $_list }) { 1345 waitpid($_list->[$i], 0) if $_list->[$i]; 1346 } 1347 } 1348 if (@{ $self->{_thrs} } > 0) { 1349 my $_list = $self->{_thrs}; 1350 for my $i (0 .. @{ $_list }) { 1351 $_list->[$i]->join() if $_list->[$i]; 1352 } 1353 } 1354 1355 ## Close sockets. 1356 $_COM_R_SOCK = undef; 1357 1358 MCE::Util::_destroy_socks($self, qw( 1359 _bsb_w_sock _bsb_r_sock _com_w_sock _com_r_sock 1360 _dat_w_sock _dat_r_sock _rla_w_sock _rla_r_sock 1361 )); 1362 1363 ($_is_MSWin32) 1364 ? MCE::Util::_destroy_pipes($self, qw( _que_w_sock _que_r_sock )) 1365 : MCE::Util::_destroy_socks($self, qw( _que_w_sock _que_r_sock )); 1366 1367 ## ------------------------------------------------------------------------- 1368 1369 ## Destroy mutexes. 1370 for my $_i (0 .. $_data_channels) { delete $self->{'_mutex_'.$_i}; } 1371 1372 ## Remove session directory. 1373 rmdir $_sess_dir if (defined $_sess_dir && -d $_sess_dir); 1374 1375 ## Reset instance. 1376 undef @{$self->{_pids}}; undef @{$self->{_thrs}}; undef @{$self->{_tids}}; 1377 undef @{$self->{_state}}; undef @{$self->{_status}}; undef @{$self->{_task}}; 1378 1379 $self->{_chunk_id} = $self->{_send_cnt} = $self->{_spawned} = 0; 1380 $self->{_total_running} = $self->{_total_exited} = 0; 1381 $self->{_total_workers} = 0; 1382 $self->{_sess_dir} = undef; 1383 1384 if ($self->{loop_timeout}) { 1385 delete $self->{_pids_t}; 1386 delete $self->{_pids_w}; 1387 } 1388 1389 return; 1390} 1391 1392############################################################################### 1393## ---------------------------------------------------------------------------- 1394## Barrier sync and yield methods. 1395## 1396############################################################################### 1397 1398sub sync { 1399 my $self = shift; $self = $MCE unless ref($self); 1400 1401 return unless ($self->{_wid}); 1402 1403 ## Barrier synchronization is supported for task 0 at this time. 1404 ## Note: Workers are assigned task_id 0 when omitting user_tasks. 1405 1406 return if ($self->{_task_id} > 0); 1407 1408 my $_chn = $self->{_chn}; 1409 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0]; 1410 my $_BSB_R_SOCK = $self->{_bsb_r_sock}; 1411 my $_BSB_W_SOCK = $self->{_bsb_w_sock}; 1412 my $_buf; 1413 1414 local $\ = undef if (defined $\); 1415 1416 ## Notify the manager process (barrier begin). 1417 print {$_DAT_W_SOCK} OUTPUT_B_SYN.$LF . $_chn.$LF; 1418 1419 ## Wait until all workers from (task_id 0) have synced. 1420 MCE::Util::_sock_ready($_BSB_R_SOCK, -1) if $_is_MSWin32; 1421 MCE::Util::_sysread($_BSB_R_SOCK, $_buf, 1); 1422 1423 ## Notify the manager process (barrier end). 1424 print {$_DAT_W_SOCK} OUTPUT_E_SYN.$LF . $_chn.$LF; 1425 1426 ## Wait until all workers from (task_id 0) have un-synced. 1427 MCE::Util::_sock_ready($_BSB_W_SOCK, -1) if $_is_MSWin32; 1428 MCE::Util::_sysread($_BSB_W_SOCK, $_buf, 1); 1429 1430 return; 1431} 1432 1433sub yield { 1434 my $self = shift; $self = $MCE unless ref($self); 1435 1436 return unless ($self->{_wid}); 1437 1438 my $_chn = $self->{_chn}; 1439 my $_DAT_LOCK = $self->{_dat_lock}; 1440 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0]; 1441 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn]; 1442 my $_lock_chn = $self->{_lock_chn}; 1443 my $_delay; 1444 1445 local $\ = undef if (defined $\); 1446 local $/ = $LF if (!$/ || $/ ne $LF); 1447 1448 $_DAT_LOCK->lock() if $_lock_chn; 1449 print({$_DAT_W_SOCK} OUTPUT_I_DLY.$LF . $_chn.$LF), 1450 print({$_DAU_W_SOCK} $self->{_task_id}.$LF); 1451 chomp($_delay = <$_DAU_W_SOCK>); 1452 $_DAT_LOCK->unlock() if $_lock_chn; 1453 1454 MCE::Util::_sleep( $_delay ); 1455} 1456 1457############################################################################### 1458## ---------------------------------------------------------------------------- 1459## Miscellaneous methods: abort exit sess_dir tmp_dir. 1460## 1461############################################################################### 1462 1463## Abort current job. 1464 1465sub abort { 1466 my $self = shift; $self = $MCE unless ref($self); 1467 1468 my $_QUE_R_SOCK = $self->{_que_r_sock}; 1469 my $_QUE_W_SOCK = $self->{_que_w_sock}; 1470 my $_abort_msg = $self->{_abort_msg}; 1471 1472 if (defined $_abort_msg) { 1473 local $\ = undef; 1474 1475 if ($_abort_msg > 0) { 1476 MCE::Util::_sysread($_QUE_R_SOCK, my($_next), $_que_read_size); 1477 syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg)); 1478 } 1479 1480 if ($self->{_wid} > 0) { 1481 my $_chn = $self->{_chn}; 1482 my $_DAT_LOCK = $self->{_dat_lock}; 1483 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0]; 1484 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn]; 1485 my $_lock_chn = $self->{_lock_chn}; 1486 1487 $_DAT_LOCK->lock() if $_lock_chn; 1488 print {$_DAT_W_SOCK} OUTPUT_W_ABT.$LF . $_chn.$LF; 1489 $_DAT_LOCK->unlock() if $_lock_chn; 1490 } 1491 } 1492 1493 return; 1494} 1495 1496## Worker exits from MCE. 1497 1498sub exit { 1499 my $self = shift; $self = $MCE unless ref($self); 1500 1501 my $_exit_status = (defined $_[0]) ? $_[0] : $?; 1502 my $_exit_msg = (defined $_[1]) ? $_[1] : ''; 1503 my $_exit_id = (defined $_[2]) ? $_[2] : $self->chunk_id; 1504 1505 @_ = (); 1506 1507 _croak('MCE::exit: method is not allowed by the manager process') 1508 unless ($self->{_wid}); 1509 1510 my $_chn = $self->{_chn}; 1511 my $_DAT_LOCK = $self->{_dat_lock}; 1512 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0]; 1513 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn]; 1514 my $_lock_chn = $self->{_lock_chn}; 1515 my $_task_id = $self->{_task_id}; 1516 1517 unless ( $self->{_exiting} ) { 1518 $self->{_exiting} = 1; 1519 1520 my $_pid = $self->{_is_thread} ? $$ .'.'. threads->tid() : $$; 1521 my $_max_retries = $self->{max_retries}; 1522 my $_chunk_id = $self->{_chunk_id}; 1523 1524 if ( defined $self->{init_relay} && !$self->{_relayed} && !$_task_id && 1525 exists $self->{_wuf} && $self->{_pid} eq $_pid ) { 1526 1527 $self->{_retry_cnt} = -1 unless defined( $self->{_retry_cnt} ); 1528 1529 if ( !$_max_retries || ++$self->{_retry_cnt} == $_max_retries ) { 1530 MCE::relay { warn "Error: chunk $_chunk_id failed\n" if $_chunk_id }; 1531 } 1532 } 1533 1534 ## Check for nested workers not yet joined. 1535 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'}; 1536 1537 MCE::Hobo->finish('MCE') 1538 if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') ); 1539 1540 local $\ = undef if (defined $\); 1541 my $_len = length $_exit_msg; 1542 1543 $_exit_id =~ s/[\r\n][\r\n]*/ /mg; 1544 $_DAT_LOCK->lock() if $_lock_chn; 1545 1546 if ($self->{_retry} && $self->{_retry}->[2]--) { 1547 $_exit_status = 0; my $_buf = $self->{freeze}($self->{_retry}); 1548 print({$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF), 1549 print({$_DAU_W_SOCK} 1550 $_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF . 1551 $_exit_status.$LF . $_exit_id.$LF . $_len.$LF . $_exit_msg . 1552 length($_buf).$LF, $_buf 1553 ); 1554 } 1555 else { 1556 print({$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF), 1557 print({$_DAU_W_SOCK} 1558 $_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF . 1559 $_exit_status.$LF . $_exit_id.$LF . $_len.$LF . $_exit_msg . 1560 '0'.$LF 1561 ); 1562 } 1563 1564 $_DAT_LOCK->unlock() if $_lock_chn; 1565 } 1566 1567 _exit($self); 1568} 1569 1570## Return the session dir, made on demand. 1571 1572sub sess_dir { 1573 my $self = shift; $self = $MCE unless ref($self); 1574 return $self->{_sess_dir} if defined $self->{_sess_dir}; 1575 1576 if ($self->{_wid} == 0) { 1577 $self->{_sess_dir} = $self->{_spawned} 1578 ? _make_sessdir($self) : undef; 1579 } 1580 else { 1581 my $_chn = $self->{_chn}; 1582 my $_DAT_LOCK = $self->{_dat_lock}; 1583 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0]; 1584 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn]; 1585 my $_lock_chn = $self->{_lock_chn}; 1586 my $_sess_dir; 1587 1588 local $\ = undef if (defined $\); 1589 local $/ = $LF if (!$/ || $/ ne $LF); 1590 1591 $_DAT_LOCK->lock() if $_lock_chn; 1592 print({$_DAT_W_SOCK} OUTPUT_S_DIR.$LF . $_chn.$LF); 1593 chomp($_sess_dir = <$_DAU_W_SOCK>); 1594 $_DAT_LOCK->unlock() if $_lock_chn; 1595 1596 $self->{_sess_dir} = $_sess_dir; 1597 } 1598} 1599 1600## Return the temp dir, made on demand. 1601 1602sub tmp_dir { 1603 my $self = shift; $self = $MCE unless ref($self); 1604 return $self->{tmp_dir} if defined $self->{tmp_dir}; 1605 1606 if ($self->{_wid} == 0) { 1607 $self->{tmp_dir} = MCE::Signal::_make_tmpdir(); 1608 } 1609 else { 1610 my $_chn = $self->{_chn}; 1611 my $_DAT_LOCK = $self->{_dat_lock}; 1612 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0]; 1613 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn]; 1614 my $_lock_chn = $self->{_lock_chn}; 1615 my $_tmp_dir; 1616 1617 local $\ = undef if (defined $\); 1618 local $/ = $LF if (!$/ || $/ ne $LF); 1619 1620 $_DAT_LOCK->lock() if $_lock_chn; 1621 print({$_DAT_W_SOCK} OUTPUT_T_DIR.$LF . $_chn.$LF); 1622 chomp($_tmp_dir = <$_DAU_W_SOCK>); 1623 $_DAT_LOCK->unlock() if $_lock_chn; 1624 1625 $self->{tmp_dir} = $_tmp_dir; 1626 } 1627} 1628 1629############################################################################### 1630## ---------------------------------------------------------------------------- 1631## Methods for serializing data from workers to the main process. 1632## 1633############################################################################### 1634 1635## Do method. Additional arguments are optional. 1636 1637sub do { 1638 my $self = shift; $self = $MCE unless ref($self); 1639 my $_pkg = caller() eq 'MCE' ? caller(1) : caller(); 1640 1641 _croak('MCE::do: (code ref) is not supported') 1642 if (ref $_[0] eq 'CODE'); 1643 _croak('MCE::do: (callback) is not specified') 1644 unless (defined ( my $_func = shift )); 1645 1646 $_func = $_pkg.'::'.$_func if (index($_func, ':') < 0); 1647 1648 if ($self->{_wid}) { 1649 return _do_callback($self, $_func, [ @_ ]); 1650 } 1651 else { 1652 no strict 'refs'; 1653 return $_func->(@_); 1654 } 1655} 1656 1657## Gather method. 1658 1659sub gather { 1660 my $self = shift; $self = $MCE unless ref($self); 1661 1662 _croak('MCE::gather: method is not allowed by the manager process') 1663 unless ($self->{_wid}); 1664 1665 return _do_gather($self, [ @_ ]); 1666} 1667 1668## Sendto method. 1669 1670{ 1671 my %_sendto_lkup = ( 1672 'file' => SENDTO_FILEV1, 'stderr' => SENDTO_STDERR, 1673 'file:' => SENDTO_FILEV2, 'stdout' => SENDTO_STDOUT, 1674 'fd:' => SENDTO_FD, 1675 ); 1676 1677 my $_v2_regx = qr/^([^:]+:)(.+)/; 1678 1679 sub sendto { 1680 1681 my $self = shift; $self = $MCE unless ref($self); 1682 my $_to = shift; 1683 1684 _croak('MCE::sendto: method is not allowed by the manager process') 1685 unless ($self->{_wid}); 1686 1687 return unless (defined $_[0]); 1688 1689 my $_dest = exists $_sendto_lkup{ lc($_to) } 1690 ? $_sendto_lkup{ lc($_to) } : undef; 1691 my $_value; 1692 1693 if (!defined $_dest) { 1694 my $_fd; 1695 1696 if (ref($_to) && ( defined ($_fd = fileno($_to)) || 1697 defined ($_fd = eval { $_to->fileno }) )) { 1698 1699 if (my $_ob = tied *{ $_to }) { 1700 if (ref $_ob eq 'IO::TieCombine::Handle') { 1701 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout'); 1702 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr'); 1703 } 1704 } 1705 1706 my $_data_ref = (scalar @_ == 1) ? \(''.$_[0]) : \join('', @_); 1707 return _do_send_glob($self, $_to, $_fd, $_data_ref); 1708 } 1709 elsif (reftype($_to) eq 'GLOB') { 1710 return _croak('Cannot write to filehandle'); 1711 } 1712 1713 if (defined $_to && $_to =~ /$_v2_regx/o) { 1714 $_dest = exists $_sendto_lkup{ lc($1) } 1715 ? $_sendto_lkup{ lc($1) } : undef; 1716 $_value = $2; 1717 } 1718 1719 if (!defined $_dest || ( !defined $_value && ( 1720 $_dest == SENDTO_FILEV2 || $_dest == SENDTO_FD 1721 ))) { 1722 my $_msg = "\n"; 1723 $_msg .= "MCE::sendto: improper use of method\n"; 1724 $_msg .= "\n"; 1725 $_msg .= "## usage:\n"; 1726 $_msg .= "## ->sendto(\"stderr\", ...);\n"; 1727 $_msg .= "## ->sendto(\"stdout\", ...);\n"; 1728 $_msg .= "## ->sendto(\"file:/path/to/file\", ...);\n"; 1729 $_msg .= "## ->sendto(\"fd:2\", ...);\n"; 1730 $_msg .= "\n"; 1731 1732 _croak($_msg); 1733 } 1734 } 1735 1736 if ($_dest == SENDTO_FILEV1) { # sendto 'file', $a, $path 1737 return if (!defined $_[1] || @_ > 2); # Please switch to using V2 1738 $_value = $_[1]; delete $_[1]; # sendto 'file:/path', $a 1739 $_dest = SENDTO_FILEV2; 1740 } 1741 1742 return _do_send($self, $_dest, $_value, @_); 1743 } 1744} 1745 1746############################################################################### 1747## ---------------------------------------------------------------------------- 1748## Functions for serializing print, printf and say statements. 1749## 1750############################################################################### 1751 1752sub print { 1753 my $self = shift; $self = $MCE unless ref($self); 1754 my ($_fd, $_glob, $_data); 1755 1756 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) || 1757 defined ($_fd = eval { $_[0]->fileno }) )) { 1758 1759 if (my $_ob = tied *{ $_[0] }) { 1760 if (ref $_ob eq 'IO::TieCombine::Handle') { 1761 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout'); 1762 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr'); 1763 } 1764 } 1765 1766 $_glob = shift; 1767 } 1768 elsif (reftype($_[0]) eq 'GLOB') { 1769 return _croak('Cannot write to filehandle'); 1770 } 1771 1772 $_data = join('', scalar @_ ? @_ : $_); 1773 1774 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd; 1775 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid}; 1776 return _do_send_glob($self, \*STDOUT, 1, \$_data); 1777} 1778 1779sub printf { 1780 my $self = shift; $self = $MCE unless ref($self); 1781 my ($_fd, $_glob, $_fmt, $_data); 1782 1783 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) || 1784 defined ($_fd = eval { $_[0]->fileno }) )) { 1785 1786 if (my $_ob = tied *{ $_[0] }) { 1787 if (ref $_ob eq 'IO::TieCombine::Handle') { 1788 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout'); 1789 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr'); 1790 } 1791 } 1792 1793 $_glob = shift; 1794 } 1795 elsif (reftype($_[0]) eq 'GLOB') { 1796 return _croak('Cannot write to filehandle'); 1797 } 1798 1799 $_fmt = shift || '%s'; 1800 $_data = sprintf($_fmt, scalar @_ ? @_ : $_); 1801 1802 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd; 1803 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid}; 1804 return _do_send_glob($self, \*STDOUT, 1, \$_data); 1805} 1806 1807sub say { 1808 my $self = shift; $self = $MCE unless ref($self); 1809 my ($_fd, $_glob, $_data); 1810 1811 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) || 1812 defined ($_fd = eval { $_[0]->fileno }) )) { 1813 1814 if (my $_ob = tied *{ $_[0] }) { 1815 if (ref $_ob eq 'IO::TieCombine::Handle') { 1816 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout'); 1817 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr'); 1818 } 1819 } 1820 1821 $_glob = shift; 1822 } 1823 elsif (reftype($_[0]) eq 'GLOB') { 1824 return _croak('Cannot write to filehandle'); 1825 } 1826 1827 $_data = join('', scalar @_ ? @_ : $_) . "\n"; 1828 1829 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd; 1830 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid}; 1831 return _do_send_glob($self, \*STDOUT, 1, \$_data); 1832} 1833 1834############################################################################### 1835## ---------------------------------------------------------------------------- 1836## Private methods. 1837## 1838############################################################################### 1839 1840sub _exit { 1841 my $self = shift; 1842 1843 delete $self->{_wuf}; _end(); 1844 1845 ## Exit thread/child process. 1846 $SIG{__DIE__} = sub {} unless $_tid; 1847 $SIG{__WARN__} = sub {}; 1848 1849 threads->exit(0) if $self->{use_threads}; 1850 1851 if (! $_tid) { 1852 $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub { 1853 $SIG{$_[0]} = $SIG{INT} = $SIG{TERM} = sub {}; 1854 1855 CORE::kill($_[0], getppid()) 1856 if (($_[0] eq 'INT' || $_[0] eq 'TERM') && $^O ne 'MSWin32'); 1857 1858 CORE::kill('KILL', $$); 1859 }; 1860 } 1861 1862 if ($self->{posix_exit} && !$_is_MSWin32) { 1863 eval { MCE::Mutex::Channel::_destroy() }; 1864 POSIX::_exit(0) if $INC{'POSIX.pm'}; 1865 CORE::kill('KILL', $$); 1866 } 1867 1868 CORE::exit(0); 1869} 1870 1871sub _get_max_workers { 1872 my $self = shift; $self = $MCE unless ref($self); 1873 1874 if (defined $self->{user_tasks}) { 1875 if (defined $self->{user_tasks}->[0]->{max_workers}) { 1876 return $self->{user_tasks}->[0]->{max_workers}; 1877 } 1878 } 1879 1880 return $self->{max_workers}; 1881} 1882 1883sub _make_sessdir { 1884 my $self = shift; $self = $MCE unless ref($self); 1885 1886 my $_sess_dir = $self->{_sess_dir}; 1887 1888 unless (defined $_sess_dir) { 1889 $self->{tmp_dir} = MCE::Signal::_make_tmpdir() 1890 unless defined $self->{tmp_dir}; 1891 1892 my $_mce_tid = $INC{'threads.pm'} ? threads->tid() : ''; 1893 $_mce_tid = '' unless defined $self->{_mce_tid}; 1894 1895 my $_mce_sid = $$ .'.'. $_mce_tid .'.'. (++$_mce_count); 1896 my $_tmp_dir = $self->{tmp_dir}; 1897 1898 _croak("MCE::sess_dir: (tmp_dir) is not defined") 1899 if (!defined $_tmp_dir || $_tmp_dir eq ''); 1900 _croak("MCE::sess_dir: ($_tmp_dir) is not a directory or does not exist") 1901 unless (-d $_tmp_dir); 1902 _croak("MCE::sess_dir: ($_tmp_dir) is not writeable") 1903 unless (-w $_tmp_dir); 1904 1905 my $_cnt = 0; $_sess_dir = "$_tmp_dir/$_mce_sid"; 1906 1907 $_sess_dir = "$_tmp_dir/$_mce_sid." . (++$_cnt) 1908 while ( !(mkdir $_sess_dir, 0770) ); 1909 } 1910 1911 return $_sess_dir; 1912} 1913 1914sub _sprintf { 1915 my ($_fmt, $_arg) = @_; 1916 # remove tainted'ness 1917 ($_fmt) = $_fmt =~ /(.*)/; 1918 1919 return sprintf("$_fmt", $_arg); 1920} 1921 1922sub _sync_buffer_to_array { 1923 my ($_buffer_ref, $_array_ref, $_chop_str) = @_; 1924 1925 local $_; my $_cnt = 0; 1926 1927 open my $_MEM_FH, '<', $_buffer_ref; 1928 binmode $_MEM_FH, ':raw'; 1929 1930 unless (length $_chop_str) { 1931 $_array_ref->[$_cnt++] = $_ while (<$_MEM_FH>); 1932 } 1933 else { 1934 $_array_ref->[$_cnt++] = <$_MEM_FH>; 1935 while (<$_MEM_FH>) { 1936 $_array_ref->[$_cnt ] = $_chop_str; 1937 $_array_ref->[$_cnt++] .= $_; 1938 } 1939 } 1940 1941 close $_MEM_FH; 1942 weaken $_MEM_FH; 1943 1944 return; 1945} 1946 1947sub _sync_params { 1948 my ($self, $_params_ref) = @_; 1949 my $_requires_shutdown = 0; 1950 1951 if (defined $_params_ref->{init_relay} && !defined $self->{init_relay}) { 1952 $_requires_shutdown = 1; 1953 } 1954 for my $_p (qw( user_begin user_func user_end )) { 1955 if (defined $_params_ref->{$_p}) { 1956 $self->{$_p} = delete $_params_ref->{$_p}; 1957 $_requires_shutdown = 1; 1958 } 1959 } 1960 for my $_p (keys %{ $_params_ref }) { 1961 _croak("MCE::_sync_params: ($_p) is not a valid params argument") 1962 unless (exists $_params_allowed_args{$_p}); 1963 1964 $self->{$_p} = $_params_ref->{$_p}; 1965 } 1966 1967 return ($self->{_spawned}) ? $_requires_shutdown : 0; 1968} 1969 1970############################################################################### 1971## ---------------------------------------------------------------------------- 1972## Dispatch methods. 1973## 1974############################################################################### 1975 1976sub _dispatch { 1977 my @_args = @_; my $_is_thread = shift @_args; 1978 my $self = $MCE = $_args[0]; 1979 1980 ## To avoid (Scalars leaked: N) messages; fixed in Perl 5.12.x 1981 @_ = (); 1982 1983 $ENV{'PERL_MCE_IPC'} = 'win32' if ( $_is_MSWin32 && ( 1984 defined($self->{max_retries}) || 1985 $INC{'MCE/Child.pm'} || 1986 $INC{'MCE/Hobo.pm'} 1987 )); 1988 1989 delete $self->{_relayed}; 1990 1991 $self->{_is_thread} = $_is_thread; 1992 $self->{_pid} = $_is_thread ? $$ .'.'. threads->tid() : $$; 1993 1994 ## Sets the seed of the base generator uniquely between workers. 1995 ## The new seed is computed using the current seed and $_wid value. 1996 ## One may set the seed at the application level for predictable 1997 ## results (non-thread workers only). Ditto for Math::Prime::Util, 1998 ## Math::Random, and Math::Random::MT::Auto. 1999 2000 { 2001 my ($_wid, $_seed) = ($_args[1], $self->{_seed}); 2002 srand(abs($_seed - ($_wid * 100000)) % 2147483560); 2003 2004 if (!$self->{use_threads}) { 2005 Math::Prime::Util::srand(abs($_seed - ($_wid * 100000)) % 2147483560) 2006 if ( $INC{'Math/Prime/Util.pm'} ); 2007 2008 MCE::Hobo->_clear() 2009 if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') ); 2010 2011 MCE::Child->_clear() if $INC{'MCE/Child.pm'}; 2012 } 2013 } 2014 2015 if (!$self->{use_threads} && $INC{'Math/Random.pm'}) { 2016 my ($_wid, $_cur_seed) = ($_args[1], Math::Random::random_get_seed()); 2017 2018 my $_new_seed = ($_cur_seed < 1073741781) 2019 ? $_cur_seed + (($_wid * 100000) % 1073741780) 2020 : $_cur_seed - (($_wid * 100000) % 1073741780); 2021 2022 Math::Random::random_set_seed($_new_seed, $_new_seed); 2023 } 2024 2025 if (!$self->{use_threads} && $INC{'Math/Random/MT/Auto.pm'}) { 2026 my ($_wid, $_cur_seed) = ( 2027 $_args[1], Math::Random::MT::Auto::get_seed()->[0] 2028 ); 2029 my $_new_seed = ($_cur_seed < 1073741781) 2030 ? $_cur_seed + (($_wid * 100000) % 1073741780) 2031 : $_cur_seed - (($_wid * 100000) % 1073741780); 2032 2033 Math::Random::MT::Auto::set_seed($_new_seed); 2034 } 2035 2036 ## Run. 2037 2038 _worker_main(@_args, \@_plugin_worker_init); 2039 2040 _exit($self); 2041} 2042 2043sub _dispatch_thread { 2044 my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_; 2045 2046 @_ = (); local $_; 2047 2048 my $_thr = threads->create( \&_dispatch, 2049 1, $self, $_wid, $_task, $_task_id, $_task_wid, $_params 2050 ); 2051 2052 _croak("MCE::_dispatch_thread: Failed to spawn worker $_wid: $!") 2053 if (!defined $_thr); 2054 2055 ## Store into an available slot (restart), otherwise append to arrays. 2056 if (defined $_params) { for my $_i (0 .. @{ $self->{_tids} } - 1) { 2057 unless (defined $self->{_tids}->[$_i]) { 2058 $self->{_thrs}->[$_i] = $_thr; 2059 $self->{_tids}->[$_i] = $_thr->tid(); 2060 return; 2061 } 2062 }} 2063 2064 push @{ $self->{_thrs} }, $_thr; 2065 push @{ $self->{_tids} }, $_thr->tid(); 2066 2067 sleep $self->{spawn_delay} 2068 if defined($self->{spawn_delay}) && $self->{spawn_delay} > 0.0; 2069 2070 return; 2071} 2072 2073sub _dispatch_child { 2074 my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_; 2075 2076 @_ = (); local $_; 2077 my $_pid = fork(); 2078 2079 _croak("MCE::_dispatch_child: Failed to spawn worker $_wid: $!") 2080 if (!defined $_pid); 2081 2082 _dispatch(0, $self, $_wid, $_task, $_task_id, $_task_wid, $_params) 2083 if ($_pid == 0); 2084 2085 ## Store into an available slot (restart), otherwise append to array. 2086 if (defined $_params) { for my $_i (0 .. @{ $self->{_pids} } - 1) { 2087 unless (defined $self->{_pids}->[$_i]) { 2088 $self->{_pids}->[$_i] = $_pid; 2089 return; 2090 } 2091 }} 2092 2093 push @{ $self->{_pids} }, $_pid; 2094 2095 if ($self->{loop_timeout} && !$_is_MSWin32) { 2096 $self->{_pids_t}{$_pid} = $_task_id; 2097 $self->{_pids_w}{$_pid} = $_wid; 2098 } 2099 2100 sleep $self->{spawn_delay} 2101 if defined($self->{spawn_delay}) && $self->{spawn_delay} > 0.0; 2102 2103 return; 2104} 2105 21061; 2107 2108