1###############################################################################
2## ----------------------------------------------------------------------------
3## MCE - Many-Core Engine for Perl providing parallel processing capabilities.
4##
5###############################################################################
6
7package MCE;
8
9use strict;
10use warnings;
11
12no warnings qw( threads recursion uninitialized );
13
14our $VERSION = '1.876';
15
16## no critic (BuiltinFunctions::ProhibitStringyEval)
17## no critic (Subroutines::ProhibitSubroutinePrototypes)
18## no critic (TestingAndDebugging::ProhibitNoStrict)
19
20use Carp ();
21
22my ($_has_threads, $_freeze, $_thaw, $_tid, $_oid);
23
24BEGIN {
25   local $@;
26
27   if ( $^O eq 'MSWin32' && ! $INC{'threads.pm'} ) {
28      eval 'use threads; use threads::shared;';
29   }
30   elsif ( $INC{'threads.pm'} && ! $INC{'threads/shared.pm'} ) {
31      eval 'use threads::shared;';
32   }
33
34   $_has_threads = $INC{'threads.pm'} ? 1 : 0;
35   $_tid = $_has_threads ? threads->tid() : 0;
36   $_oid = "$$.$_tid";
37
38   if ( $] ge '5.008008' && ! $INC{'PDL.pm'} ) {
39      eval 'use Sereal::Encoder 3.015; use Sereal::Decoder 3.015;';
40      if ( ! $@ ) {
41         my $_encoder_ver = int( Sereal::Encoder->VERSION() );
42         my $_decoder_ver = int( Sereal::Decoder->VERSION() );
43         if ( $_encoder_ver - $_decoder_ver == 0 ) {
44            $_freeze = \&Sereal::Encoder::encode_sereal;
45            $_thaw   = \&Sereal::Decoder::decode_sereal;
46         }
47      }
48   }
49
50   if ( ! defined $_freeze ) {
51      require Storable;
52      $_freeze = \&Storable::freeze;
53      $_thaw   = \&Storable::thaw;
54   }
55}
56
57use IO::Handle ();
58use Scalar::Util qw( looks_like_number refaddr reftype weaken );
59use Socket qw( SOL_SOCKET SO_RCVBUF );
60use Time::HiRes qw( sleep time );
61
62use MCE::Util qw( $LF );
63use MCE::Signal ();
64use MCE::Mutex ();
65
66our ($MCE, $RLA, $_que_template, $_que_read_size);
67our (%_valid_fields_new);
68
69my  ($TOP_HDLR, $_is_MSWin32, $_is_winenv, $_prev_mce);
70my  (%_valid_fields_task, %_params_allowed_args);
71
72BEGIN {
73   ## Configure pack/unpack template for writing to and from the queue.
74   ## Each entry contains 2 positive numbers: chunk_id & msg_id.
75   ## Check for >= 64-bit, otherwize fall back to machine's word length.
76
77   $_que_template  = ( ( log(~0+1) / log(2) ) >= 64 ) ? 'Q2' : 'I2';
78   $_que_read_size = length pack($_que_template, 0, 0);
79
80   ## Attributes used internally.
81   ## _abort_msg _caller _chn _com_lock _dat_lock _mgr_live _rla_data _seed
82   ## _chunk_id _pids _run_mode _single_dim _thrs _tids _task_wid _wid _wuf
83   ## _exiting _exit_pid _last_sref _total_exited _total_running _total_workers
84   ## _send_cnt _sess_dir _spawned _state _status _task _task_id _wrk_status
85   ## _init_pid _init_total_workers _pids_t _pids_w _pids_c _relayed
86   ##
87   ## _bsb_r_sock _bsb_w_sock _com_r_sock _com_w_sock _dat_r_sock _dat_w_sock
88   ## _que_r_sock _que_w_sock _rla_r_sock _rla_w_sock _data_channels
89   ## _lock_chn   _mutex_n
90
91   %_valid_fields_new = map { $_ => 1 } qw(
92      max_workers tmp_dir use_threads user_tasks task_end task_name freeze thaw
93      chunk_size input_data sequence job_delay spawn_delay submit_delay RS
94      flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio
95      interval user_args user_begin user_end user_func user_error user_output
96      bounds_only gather init_relay on_post_exit on_post_run parallel_io
97      loop_timeout max_retries progress posix_exit
98   );
99   %_params_allowed_args = map { $_ => 1 } qw(
100      chunk_size input_data sequence job_delay spawn_delay submit_delay RS
101      flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio
102      interval user_args user_begin user_end user_func user_error user_output
103      bounds_only gather init_relay on_post_exit on_post_run parallel_io
104      loop_timeout max_retries progress
105   );
106   %_valid_fields_task = map { $_ => 1 } qw(
107      max_workers chunk_size input_data interval sequence task_end task_name
108      bounds_only gather init_relay user_args user_begin user_end user_func
109      RS parallel_io use_slurpio use_threads
110   );
111
112   $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
113   $_is_winenv  = ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 1 : 0;
114
115   ## Create accessor functions.
116   no strict 'refs'; no warnings 'redefine';
117
118   for my $_p (qw( chunk_size max_retries max_workers task_name user_args )) {
119      *{ $_p } = sub () {
120         my $self = shift; $self = $MCE unless ref($self);
121         return $self->{$_p};
122      };
123   }
124   for my $_p (qw( chunk_id task_id task_wid wid )) {
125      *{ $_p } = sub () {
126         my $self = shift; $self = $MCE unless ref($self);
127         return $self->{"_${_p}"};
128      };
129   }
130   for my $_p (qw( freeze thaw )) {
131      *{ $_p } = sub () {
132         my $self = shift; $self = $MCE unless ref($self);
133         return $self->{$_p}(@_);
134      };
135   }
136
137   $RLA = {};
138
139   return;
140}
141
142###############################################################################
143## ----------------------------------------------------------------------------
144## Import routine.
145##
146###############################################################################
147
148use constant { SELF => 0, CHUNK => 1, CID => 2 };
149
150our $_MCE_LOCK : shared = 1;
151our $_WIN_LOCK : shared = 1;
152
153my ($_def, $_imported) = ({});
154
155sub import {
156   my ($_class, $_pkg) = (shift, caller);
157   my $_p = $_def->{$_pkg} = {};
158
159   ## Process module arguments.
160   while ( my $_argument = shift ) {
161      my $_arg = lc $_argument;
162
163      $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
164      $_p->{CHUNK_SIZE}  = shift, next if ( $_arg eq 'chunk_size' );
165      $_p->{TMP_DIR}     = shift, next if ( $_arg eq 'tmp_dir' );
166      $_p->{FREEZE}      = shift, next if ( $_arg eq 'freeze' );
167      $_p->{THAW}        = shift, next if ( $_arg eq 'thaw' );
168
169      if ( $_arg eq 'export_const' || $_arg eq 'const' ) {
170         if ( shift eq '1' ) {
171            no strict 'refs'; no warnings 'redefine';
172            *{ $_pkg.'::SELF'  } = \&SELF;
173            *{ $_pkg.'::CHUNK' } = \&CHUNK;
174            *{ $_pkg.'::CID'   } = \&CID;
175         }
176         next;
177      }
178
179      ## Sereal, if available, is used automatically by MCE 1.800 onwards.
180      if ( $_arg eq 'sereal' ) {
181         if ( shift eq '0' ) {
182            require Storable;
183            $_p->{FREEZE} = \&Storable::freeze;
184            $_p->{THAW}   = \&Storable::thaw;
185         }
186         next;
187      }
188
189      _croak("Error: ($_argument) invalid module option");
190   }
191
192   return if $_imported++;
193
194   ## Instantiate a module-level instance.
195   $MCE = MCE->new( _module_instance => 1, max_workers => 0 );
196
197   return;
198}
199
200###############################################################################
201## ----------------------------------------------------------------------------
202## Define constants & variables.
203##
204###############################################################################
205
206use constant {
207
208   # Max data channels. This cannot be greater than 8 on MSWin32.
209   DATA_CHANNELS  => ($^O eq 'MSWin32') ? 8 : 10,
210
211   # Max GC size. Undef variable when exceeding size.
212   MAX_GC_SIZE    => 1024 * 1024 * 64,
213
214   MAX_RECS_SIZE  => 8192,     # Reads number of records if N <= value
215                               # Reads number of bytes if N > value
216
217   OUTPUT_W_ABT   => 'W~ABT',  # Worker has aborted
218   OUTPUT_W_DNE   => 'W~DNE',  # Worker has completed
219   OUTPUT_W_RLA   => 'W~RLA',  # Worker has relayed
220   OUTPUT_W_EXT   => 'W~EXT',  # Worker has exited
221   OUTPUT_A_REF   => 'A~REF',  # Input << Array ref
222   OUTPUT_G_REF   => 'G~REF',  # Input << Glob ref
223   OUTPUT_H_REF   => 'H~REF',  # Input << Hash ref
224   OUTPUT_I_REF   => 'I~REF',  # Input << Iter ref
225   OUTPUT_A_CBK   => 'A~CBK',  # Callback w/ multiple args
226   OUTPUT_N_CBK   => 'N~CBK',  # Callback w/ no args
227   OUTPUT_A_GTR   => 'A~GTR',  # Gather data
228   OUTPUT_O_SND   => 'O~SND',  # Send >> STDOUT
229   OUTPUT_E_SND   => 'E~SND',  # Send >> STDERR
230   OUTPUT_F_SND   => 'F~SND',  # Send >> File
231   OUTPUT_D_SND   => 'D~SND',  # Send >> File descriptor
232   OUTPUT_B_SYN   => 'B~SYN',  # Barrier sync - begin
233   OUTPUT_E_SYN   => 'E~SYN',  # Barrier sync - end
234   OUTPUT_S_IPC   => 'S~IPC',  # Change to win32 IPC
235   OUTPUT_C_NFY   => 'C~NFY',  # Chunk ID notification
236   OUTPUT_P_NFY   => 'P~NFY',  # Progress notification
237   OUTPUT_R_NFY   => 'R~NFY',  # Relay notification
238   OUTPUT_S_DIR   => 'S~DIR',  # Make/get sess_dir
239   OUTPUT_T_DIR   => 'T~DIR',  # Make/get tmp_dir
240   OUTPUT_I_DLY   => 'I~DLY',  # Interval delay
241
242   READ_FILE      => 0,        # Worker reads file handle
243   READ_MEMORY    => 1,        # Worker reads memory handle
244
245   REQUEST_ARRAY  => 0,        # Worker requests next array chunk
246   REQUEST_GLOB   => 1,        # Worker requests next glob chunk
247   REQUEST_HASH   => 2,        # Worker requests next hash chunk
248
249   SENDTO_FILEV1  => 0,        # Worker sends to 'file', $a, '/path'
250   SENDTO_FILEV2  => 1,        # Worker sends to 'file:/path', $a
251   SENDTO_STDOUT  => 2,        # Worker sends to STDOUT
252   SENDTO_STDERR  => 3,        # Worker sends to STDERR
253   SENDTO_FD      => 4,        # Worker sends to file descriptor
254
255   WANTS_UNDEF    => 0,        # Callee wants nothing
256   WANTS_ARRAY    => 1,        # Callee wants list
257   WANTS_SCALAR   => 2,        # Callee wants scalar
258};
259
260my $_mce_count = 0;
261
262sub CLONE {
263   $_tid = threads->tid() if $INC{'threads.pm'};
264}
265
266sub DESTROY {
267   CORE::kill('KILL', $$)
268      if ( $_is_MSWin32 && $MCE::Signal::KILLED );
269
270   $_[0]->shutdown(1)
271      if ( $_[0] && $_[0]->{_spawned} && $_[0]->{_init_pid} eq "$$.$_tid" &&
272           !$MCE::Signal::KILLED );
273
274   return;
275}
276
277END {
278   return unless ( defined $MCE );
279
280   my $_pid = $MCE->{_is_thread} ? $$ .'.'. threads->tid() : $$;
281   $MCE->exit if ( exists $MCE->{_wuf} && $MCE->{_pid} eq $_pid );
282
283   _end();
284}
285
286sub _end {
287   MCE::Flow->finish   ( 'MCE' ) if $INC{'MCE/Flow.pm'};
288   MCE::Grep->finish   ( 'MCE' ) if $INC{'MCE/Grep.pm'};
289   MCE::Loop->finish   ( 'MCE' ) if $INC{'MCE/Loop.pm'};
290   MCE::Map->finish    ( 'MCE' ) if $INC{'MCE/Map.pm'};
291   MCE::Step->finish   ( 'MCE' ) if $INC{'MCE/Step.pm'};
292   MCE::Stream->finish ( 'MCE' ) if $INC{'MCE/Stream.pm'};
293
294   $MCE = $TOP_HDLR = undef;
295}
296
297###############################################################################
298## ----------------------------------------------------------------------------
299## Plugin interface for external modules plugging into MCE, e.g. MCE::Queue.
300##
301###############################################################################
302
303my (%_plugin_function, @_plugin_loop_begin, @_plugin_loop_end);
304my (%_plugin_list, @_plugin_worker_init);
305
306sub _attach_plugin {
307   my $_ext_module = caller;
308
309   unless (exists $_plugin_list{$_ext_module}) {
310      $_plugin_list{$_ext_module} = undef;
311
312      my $_ext_output_function    = $_[0];
313      my $_ext_output_loop_begin  = $_[1];
314      my $_ext_output_loop_end    = $_[2];
315      my $_ext_worker_init        = $_[3];
316
317      if (ref $_ext_output_function eq 'HASH') {
318         for my $_p (keys %{ $_ext_output_function }) {
319            $_plugin_function{$_p} = $_ext_output_function->{$_p}
320               unless (exists $_plugin_function{$_p});
321         }
322      }
323
324      push @_plugin_loop_begin, $_ext_output_loop_begin
325         if (ref $_ext_output_loop_begin eq 'CODE');
326      push @_plugin_loop_end, $_ext_output_loop_end
327         if (ref $_ext_output_loop_end eq 'CODE');
328      push @_plugin_worker_init, $_ext_worker_init
329         if (ref $_ext_worker_init eq 'CODE');
330   }
331
332   @_ = ();
333
334   return;
335}
336
337## Functions for saving and restoring $MCE.
338## Called by MCE::{ Flow, Grep, Loop, Map, Step, and Stream }.
339
340sub _save_state {
341   $_prev_mce = $MCE; $MCE = $_[0];
342   return;
343}
344sub _restore_state {
345   $_prev_mce->{_wrk_status} = $MCE->{_wrk_status};
346   $MCE = $_prev_mce; $_prev_mce = undef;
347   return;
348}
349
350###############################################################################
351## ----------------------------------------------------------------------------
352## New instance instantiation.
353##
354###############################################################################
355
356sub _croak {
357   if (MCE->wid == 0 || ! $^S) {
358      $SIG{__DIE__}  = \&MCE::Signal::_die_handler;
359      $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
360   }
361   $\ = undef; goto &Carp::croak;
362}
363
364use MCE::Core::Validation ();
365use MCE::Core::Manager ();
366use MCE::Core::Worker ();
367
368sub new {
369   my ($class, %self) = @_;
370   my $_pkg = exists $self{pkg} ? delete $self{pkg} : caller;
371
372   @_ = ();
373
374   bless(\%self, ref($class) || $class);
375
376   $self{task_name}   ||= 'MCE';
377   $self{max_workers} ||= $_def->{$_pkg}{MAX_WORKERS} || 1;
378   $self{chunk_size}  ||= $_def->{$_pkg}{CHUNK_SIZE}  || 1;
379   $self{tmp_dir}     ||= $_def->{$_pkg}{TMP_DIR}     || $MCE::Signal::tmp_dir;
380   $self{freeze}      ||= $_def->{$_pkg}{FREEZE}      || $_freeze;
381   $self{thaw}        ||= $_def->{$_pkg}{THAW}        || $_thaw;
382
383   if (exists $self{_module_instance}) {
384      $self{_init_total_workers} = $self{max_workers};
385      $self{_chunk_id} = $self{_task_wid} = $self{_wrk_status} = 0;
386      $self{_spawned}  = $self{_task_id}  = $self{_wid} = 0;
387      $self{_init_pid} = "$$.$_tid";
388
389      return \%self;
390   }
391
392   _sendto_fhs_close();
393
394   for my $_p (keys %self) {
395      _croak("MCE::new: ($_p) is not a valid constructor argument")
396         unless (exists $_valid_fields_new{$_p});
397   }
398
399   $self{_caller} = $_pkg, $self{_init_pid} = "$$.$_tid";
400
401   if (defined $self{use_threads}) {
402      if (!$_has_threads && $self{use_threads}) {
403         my $_msg  = "\n";
404            $_msg .= "## Please include threads support prior to loading MCE\n";
405            $_msg .= "## when specifying use_threads => $self{use_threads}\n";
406            $_msg .= "\n";
407
408         _croak($_msg);
409      }
410   }
411   else {
412      $self{use_threads} = ($_has_threads) ? 1 : 0;
413   }
414
415   if (!exists $self{posix_exit}) {
416      $self{posix_exit} = 1 if (
417         $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
418         $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
419         $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
420         $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
421         $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
422      );
423   }
424
425   ## -------------------------------------------------------------------------
426   ## Validation.
427
428   if (defined $self{tmp_dir}) {
429      _croak("MCE::new: ($self{tmp_dir}) is not a directory or does not exist")
430         unless (-d $self{tmp_dir});
431      _croak("MCE::new: ($self{tmp_dir}) is not writeable")
432         unless (-w $self{tmp_dir});
433   }
434
435   if (defined $self{user_tasks}) {
436      _croak('MCE::new: (user_tasks) is not an ARRAY reference')
437         unless (ref $self{user_tasks} eq 'ARRAY');
438
439      $self{max_workers} = _parse_max_workers($self{max_workers});
440      $self{init_relay}  = $self{user_tasks}->[0]->{init_relay}
441         if ($self{user_tasks}->[0]->{init_relay});
442
443      for my $_task (@{ $self{user_tasks} }) {
444         for my $_p (keys %{ $_task }) {
445            _croak("MCE::new: ($_p) is not a valid task constructor argument")
446               unless (exists $_valid_fields_task{$_p});
447         }
448         $_task->{max_workers} = 0 unless scalar(keys %{ $_task });
449
450         $_task->{max_workers} = $self{max_workers}
451            unless (defined $_task->{max_workers});
452         $_task->{use_threads} = $self{use_threads}
453            unless (defined $_task->{use_threads});
454
455         bless($_task, ref(\%self) || \%self);
456      }
457   }
458
459   _validate_args(\%self);
460
461   ## -------------------------------------------------------------------------
462   ## Private options. Limit chunk_size.
463
464   my $_run_lock;
465
466   $self{_chunk_id}   = 0;  # Chunk ID
467   $self{_send_cnt}   = 0;  # Number of times data was sent via send
468   $self{_spawned}    = 0;  # Have workers been spawned
469   $self{_task_id}    = 0;  # Task ID, starts at 0 (array index)
470   $self{_task_wid}   = 0;  # Task Worker ID, starts at 1 per task
471   $self{_wid}        = 0;  # Worker ID, starts at 1 per MCE instance
472   $self{_wrk_status} = 0;  # For saving exit status when worker exits
473
474   $self{_run_lock}   = threads::shared::share($_run_lock) if $_is_MSWin32;
475
476   $self{_last_sref}  = (ref $self{input_data} eq 'SCALAR')
477      ? refaddr($self{input_data}) : 0;
478
479   my $_data_channels = ("$$.$_tid" eq $_oid)
480      ? ( $INC{'MCE/Channel.pm'} ? 6 : DATA_CHANNELS )
481      : 2;
482
483   my $_total_workers = 0;
484
485   if (defined $self{user_tasks}) {
486      $_total_workers += $_->{max_workers} for @{ $self{user_tasks} };
487   } else {
488      $_total_workers = $self{max_workers};
489   }
490
491   $self{_init_total_workers} = $_total_workers;
492
493   $self{_data_channels} = ($_total_workers < $_data_channels)
494      ? $_total_workers : $_data_channels;
495
496   $self{_lock_chn} = ($_total_workers > $_data_channels) ? 1 : 0;
497   $self{_lock_chn} = 1 if $INC{'MCE/Child.pm'} || $INC{'MCE/Hobo.pm'};
498
499   $MCE = \%self if ($MCE->{_wid} == 0);
500
501   return \%self;
502}
503
504###############################################################################
505## ----------------------------------------------------------------------------
506## Spawn method.
507##
508###############################################################################
509
510sub spawn {
511   my $self = shift; $self = $MCE unless ref($self);
512
513   local $_; @_ = ();
514
515   _croak('MCE::spawn: method is not allowed by the worker process')
516      if ($self->{_wid});
517
518   ## Return if workers have already been spawned or if module instance.
519   return $self if ($self->{_spawned} || exists $self->{_module_instance});
520
521   lock $_WIN_LOCK if $_is_MSWin32;    # Obtain locks
522   lock $_MCE_LOCK if $_has_threads && $_is_winenv;
523
524   $MCE::_GMUTEX->lock() if ($_tid && $MCE::_GMUTEX);
525   sleep 0.015 if $_tid;
526
527   _sendto_fhs_close();
528
529   if ($INC{'PDL.pm'}) { local $@;
530      eval 'use PDL::IO::Storable' unless $INC{'PDL/IO/Storable.pm'};
531      eval 'PDL::no_clone_skip_warning()';
532   }
533   if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
534      local $@; eval 'require Net::HTTP; require Net::HTTPS';
535   }
536
537   ## Start the shared-manager process if not running.
538   MCE::Shared->start() if $INC{'MCE/Shared.pm'};
539
540   ## Load input module.
541   if (defined $self->{sequence}) {
542      require MCE::Core::Input::Sequence
543         unless $INC{'MCE/Core/Input/Sequence.pm'};
544   }
545   elsif (defined $self->{input_data}) {
546      my $_ref = ref $self->{input_data};
547      if ($_ref =~ /^(?:ARRAY|HASH|GLOB|FileHandle|IO::)/) {
548         require MCE::Core::Input::Request
549            unless $INC{'MCE/Core/Input/Request.pm'};
550      }
551      elsif ($_ref eq 'CODE') {
552         require MCE::Core::Input::Iterator
553            unless $INC{'MCE/Core/Input/Iterator.pm'};
554      }
555      else {
556         require MCE::Core::Input::Handle
557            unless $INC{'MCE/Core/Input/Handle.pm'};
558      }
559   }
560
561   my $_die_handler  = $SIG{__DIE__};
562   my $_warn_handler = $SIG{__WARN__};
563
564   $SIG{__DIE__}  = \&MCE::Signal::_die_handler;
565   $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
566
567   if (!defined $TOP_HDLR || (!$TOP_HDLR->{_mgr_live} && !$TOP_HDLR->{_wid})) {
568      ## On Windows, must shutdown the last idle MCE session.
569      if ($_is_MSWin32 && defined $TOP_HDLR && $TOP_HDLR->{_spawned}) {
570         $TOP_HDLR->shutdown(1);
571      }
572      $TOP_HDLR = $self;
573   }
574   elsif (refaddr($self) != refaddr($TOP_HDLR)) {
575      ## Reduce the maximum number of channels for nested sessions.
576      $self->{_data_channels} = 4 if ($self->{_data_channels} > 4);
577      $self->{_lock_chn} = 1 if ($self->{_init_total_workers} > 4);
578
579      ## On Windows, instruct the manager process to enable win32 IPC.
580      if ($_is_MSWin32 && $ENV{'PERL_MCE_IPC'} ne 'win32') {
581         $ENV{'PERL_MCE_IPC'} = 'win32'; local $\ = undef;
582         my $_DAT_W_SOCK = $TOP_HDLR->{_dat_w_sock}->[0];
583         print {$_DAT_W_SOCK} OUTPUT_S_IPC.$LF . '0'.$LF;
584
585         MCE::Util::_sock_ready($_DAT_W_SOCK, -1);
586         MCE::Util::_sysread($_DAT_W_SOCK, my($_buf), 1);
587      }
588   }
589
590   ## -------------------------------------------------------------------------
591
592   my $_data_channels = $self->{_data_channels};
593   my $_max_workers   = _get_max_workers($self);
594   my $_use_threads   = $self->{use_threads};
595
596   ## Create locks for data channels.
597   $self->{'_mutex_0'} = MCE::Mutex->new( impl => 'Channel' );
598
599   if ($self->{_lock_chn}) {
600      $self->{'_mutex_'.$_} = MCE::Mutex->new( impl => 'Channel' )
601         for (1 .. $_data_channels);
602   }
603
604   ## Create sockets for IPC.                             sync, comm, data
605   MCE::Util::_sock_pair($self, qw(_bsb_r_sock _bsb_w_sock), undef, 1);
606   MCE::Util::_sock_pair($self, qw(_com_r_sock _com_w_sock), undef, 1);
607
608   MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), 0);
609   MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), $_, 1)
610      for (1 .. $_data_channels);
611
612   setsockopt($self->{_dat_r_sock}->[0], SOL_SOCKET, SO_RCVBUF, pack('i', 4096))
613      if ($^O ne 'aix' && $^O ne 'linux');
614
615   ($_is_MSWin32)                                                   # input
616      ? MCE::Util::_pipe_pair($self, qw(_que_r_sock _que_w_sock))
617      : MCE::Util::_sock_pair($self, qw(_que_r_sock _que_w_sock), undef, 1);
618
619   if (defined $self->{init_relay}) {                               # relay
620      unless ($INC{'MCE/Relay.pm'}) {
621         require MCE::Relay; MCE::Relay->import();
622      }
623      MCE::Util::_sock_pair($self, qw(_rla_r_sock _rla_w_sock), $_, 1)
624         for (0 .. $_max_workers - 1);
625   }
626
627   $self->{_seed} = int(rand() * 1e9);
628
629   ## -------------------------------------------------------------------------
630
631   ## Spawn workers.
632   $self->{_pids}   = [], $self->{_thrs}  = [], $self->{_tids} = [];
633   $self->{_status} = [], $self->{_state} = [], $self->{_task} = [];
634
635   if ($self->{loop_timeout} && !$_is_MSWin32) {
636      $self->{_pids_t} = {}, $self->{_pids_w} = {};
637   }
638
639   local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH} unless $_is_MSWin32;
640
641   if (!defined $self->{user_tasks}) {
642      $self->{_total_workers} = $_max_workers;
643
644      if (defined $_use_threads && $_use_threads == 1) {
645         _dispatch_thread($self, $_) for (1 .. $_max_workers);
646      } else {
647         _dispatch_child($self, $_) for (1 .. $_max_workers);
648      }
649
650      $self->{_task}->[0] = { _total_workers => $_max_workers };
651
652      for my $_i (1 .. $_max_workers) {
653         $self->{_state}->[$_i] = {
654            _task => undef, _task_id => undef, _task_wid => undef,
655            _params => undef, _chn => $_i % $_data_channels + 1
656         }
657      }
658   }
659   else {
660      my ($_task_id, $_wid);
661
662      $self->{_total_workers}  = 0;
663      $self->{_total_workers} += $_->{max_workers} for @{ $self->{user_tasks} };
664
665      # Must spawn processes first for extra stability on BSD/Darwin.
666      $_task_id = $_wid = 0;
667
668      for my $_task (@{ $self->{user_tasks} }) {
669         my $_tsk_use_threads = $_task->{use_threads};
670
671         if (defined $_tsk_use_threads && $_tsk_use_threads == 1) {
672            $_wid += $_task->{max_workers};
673         } else {
674            _dispatch_child($self, ++$_wid, $_task, $_task_id, $_)
675               for (1 .. $_task->{max_workers});
676         }
677
678         $_task_id++;
679      }
680
681      # Then, spawn threads last.
682      $_task_id = $_wid = 0;
683
684      for my $_task (@{ $self->{user_tasks} }) {
685         my $_tsk_use_threads = $_task->{use_threads};
686
687         if (defined $_tsk_use_threads && $_tsk_use_threads == 1) {
688            _dispatch_thread($self, ++$_wid, $_task, $_task_id, $_)
689               for (1 .. $_task->{max_workers});
690         } else {
691            $_wid += $_task->{max_workers};
692         }
693
694         $_task_id++;
695      }
696
697      # Save state.
698      $_task_id = $_wid = 0;
699
700      for my $_task (@{ $self->{user_tasks} }) {
701         $self->{_task}->[$_task_id] = {
702            _total_running => 0, _total_workers => $_task->{max_workers}
703         };
704         for my $_i (1 .. $_task->{max_workers}) {
705            $_wid += 1;
706            $self->{_state}->[$_wid] = {
707               _task => $_task, _task_id => $_task_id, _task_wid => $_i,
708               _params => undef, _chn => $_wid % $_data_channels + 1
709            }
710         }
711
712         $_task_id++;
713      }
714   }
715
716   ## -------------------------------------------------------------------------
717
718   $self->{_send_cnt} = 0, $self->{_spawned} = 1;
719
720   $SIG{__DIE__}  = $_die_handler;
721   $SIG{__WARN__} = $_warn_handler;
722
723   $MCE = $self if ($MCE->{_wid} == 0);
724
725   $MCE::_GMUTEX->unlock() if ($_tid && $MCE::_GMUTEX);
726
727   return $self;
728}
729
730###############################################################################
731## ----------------------------------------------------------------------------
732## Process method, relay stubs, and AUTOLOAD for methods not used often.
733##
734###############################################################################
735
736sub process {
737   my $self = shift; $self = $MCE unless ref($self);
738
739   _validate_runstate($self, 'MCE::process');
740
741   my ($_params_ref, $_input_data);
742
743   if (ref $_[0] eq 'HASH' && ref $_[1] eq 'HASH') {
744      $_params_ref = $_[0], $_input_data = $_[1];
745   } elsif (ref $_[0] eq 'HASH') {
746      $_params_ref = $_[0], $_input_data = $_[1];
747   } else {
748      $_params_ref = $_[1], $_input_data = $_[0];
749   }
750
751   @_ = ();
752
753   ## Set input data.
754   if (defined $_input_data) {
755      $_params_ref->{input_data} = $_input_data;
756   }
757   elsif ( !defined $_params_ref->{input_data} &&
758           !defined $_params_ref->{sequence} ) {
759      _croak('MCE::process: (input_data or sequence) is not specified');
760   }
761
762   ## Pass 0 to "not" auto-shutdown after processing.
763   $self->run(0, $_params_ref);
764
765   return $self;
766}
767
768sub relay (;&) {
769   _croak('MCE::relay: (init_relay) is not specified')
770      unless (defined $MCE->{init_relay});
771}
772
773{
774   no warnings 'once';
775   *relay_unlock = \&relay;
776}
777
778sub AUTOLOAD {
779   # $AUTOLOAD = MCE::<method_name>
780
781   my $_fcn = substr($MCE::AUTOLOAD, 5);
782   my $self = shift; $self = $MCE unless ref($self);
783
784   # "for" sugar methods
785
786   if ($_fcn eq 'forchunk') {
787      require MCE::Candy unless $INC{'MCE/Candy.pm'};
788      return  MCE::Candy::forchunk($self, @_);
789   }
790   elsif ($_fcn eq 'foreach') {
791      require MCE::Candy unless $INC{'MCE/Candy.pm'};
792      return  MCE::Candy::foreach($self, @_);
793   }
794   elsif ($_fcn eq 'forseq') {
795      require MCE::Candy unless $INC{'MCE/Candy.pm'};
796      return  MCE::Candy::forseq($self, @_);
797   }
798
799   # relay stubs for MCE::Relay
800
801   if ($_fcn eq 'relay_lock' || $_fcn eq 'relay_recv') {
802      _croak('MCE::relay: (init_relay) is not specified')
803         unless (defined $MCE->{init_relay});
804   }
805   elsif ($_fcn eq 'relay_final') {
806      return;
807   }
808
809   # worker immediately exits the chunking loop
810
811   if ($_fcn eq 'last') {
812      _croak('MCE::last: method is not allowed by the manager process')
813         unless ($self->{_wid});
814
815      $self->{_last_jmp}() if (defined $self->{_last_jmp});
816
817      return;
818   }
819
820   # worker starts the next iteration of the chunking loop
821
822   elsif ($_fcn eq 'next') {
823      _croak('MCE::next: method is not allowed by the manager process')
824         unless ($self->{_wid});
825
826      $self->{_next_jmp}() if (defined $self->{_next_jmp});
827
828      return;
829   }
830
831   # return the process ID, include thread ID for threads
832
833   elsif ($_fcn eq 'pid') {
834      if (defined $self->{_pid}) {
835         return $self->{_pid};
836      } elsif ($_has_threads && $self->{use_threads}) {
837         return $$ .'.'. threads->tid();
838      }
839      return $$;
840   }
841
842   # return the exit status
843   # _wrk_status holds the greatest exit status among workers exiting
844
845   elsif ($_fcn eq 'status') {
846      _croak('MCE::status: method is not allowed by the worker process')
847         if ($self->{_wid});
848
849      return (defined $self->{_wrk_status}) ? $self->{_wrk_status} : 0;
850   }
851
852   _croak("Can't locate object method \"$_fcn\" via package \"MCE\"");
853}
854
855###############################################################################
856## ----------------------------------------------------------------------------
857## Restart worker method.
858##
859###############################################################################
860
861sub restart_worker {
862   my $self = shift; $self = $MCE unless ref($self);
863
864   @_ = ();
865
866   _croak('MCE::restart_worker: method is not allowed by the worker process')
867      if ($self->{_wid});
868
869   my $_wid = $self->{_exited_wid};
870
871   my $_params   = $self->{_state}->[$_wid]->{_params};
872   my $_task_wid = $self->{_state}->[$_wid]->{_task_wid};
873   my $_task_id  = $self->{_state}->[$_wid]->{_task_id};
874   my $_task     = $self->{_state}->[$_wid]->{_task};
875   my $_chn      = $self->{_state}->[$_wid]->{_chn};
876
877   $_params->{_chn} = $_chn;
878
879   my $_use_threads = (defined $_task_id)
880      ? $_task->{use_threads} : $self->{use_threads};
881
882   $self->{_task}->[$_task_id]->{_total_running} += 1 if (defined $_task_id);
883   $self->{_task}->[$_task_id]->{_total_workers} += 1 if (defined $_task_id);
884
885   $self->{_total_running} += 1;
886   $self->{_total_workers} += 1;
887
888   if (defined $_use_threads && $_use_threads == 1) {
889      _dispatch_thread($self, $_wid, $_task, $_task_id, $_task_wid, $_params);
890   } else {
891      _dispatch_child($self, $_wid, $_task, $_task_id, $_task_wid, $_params);
892   }
893
894   delete $self->{_retry_cnt};
895
896   if (defined $self->{spawn_delay} && $self->{spawn_delay} > 0.0) {
897      sleep $self->{spawn_delay};
898   } elsif ($_tid || $_is_MSWin32) {
899      sleep 0.045;
900   }
901
902   return;
903}
904
905###############################################################################
906## ----------------------------------------------------------------------------
907## Run method.
908##
909###############################################################################
910
911sub run {
912   my $self = shift; $self = $MCE unless ref($self);
913
914   _croak('MCE::run: method is not allowed by the worker process')
915      if ($self->{_wid});
916
917   my ($_auto_shutdown, $_params_ref);
918
919   if (ref $_[0] eq 'HASH') {
920      $_auto_shutdown = (defined $_[1]) ? $_[1] : 1;
921      $_params_ref    = $_[0];
922   } else {
923      $_auto_shutdown = (defined $_[0]) ? $_[0] : 1;
924      $_params_ref    = $_[1];
925   }
926
927   @_ = ();
928
929   my $_has_user_tasks = (defined $self->{user_tasks}) ? 1 : 0;
930   my $_requires_shutdown = 0;
931
932   ## Unset params if workers have already been sent user_data via send.
933   ## Set user_func to NOOP if not specified.
934
935   $_params_ref = undef if ($self->{_send_cnt});
936
937   if (!defined $self->{user_func} && !defined $_params_ref->{user_func}) {
938      $self->{user_func} = \&MCE::Signal::_NOOP;
939   }
940
941   ## Set user specified params if specified.
942   ## Shutdown workers if determined by _sync_params or if processing a
943   ## scalar reference. Workers need to be restarted in order to pick up
944   ## on the new code or scalar reference.
945
946   if (defined $_params_ref && ref $_params_ref eq 'HASH') {
947      $_requires_shutdown = _sync_params($self, $_params_ref);
948      _validate_args($self);
949   }
950   if ($_has_user_tasks) {
951      $self->{input_data} = $self->{user_tasks}->[0]->{input_data}
952         if ($self->{user_tasks}->[0]->{input_data});
953      $self->{use_slurpio} = $self->{user_tasks}->[0]->{use_slurpio}
954         if ($self->{user_tasks}->[0]->{use_slurpio});
955      $self->{parallel_io} = $self->{user_tasks}->[0]->{parallel_io}
956         if ($self->{user_tasks}->[0]->{parallel_io});
957      $self->{RS} = $self->{user_tasks}->[0]->{RS}
958         if ($self->{user_tasks}->[0]->{RS});
959   }
960   if (ref $self->{input_data} eq 'SCALAR') {
961      if (refaddr($self->{input_data}) != $self->{_last_sref}) {
962         $_requires_shutdown = 1;
963      }
964      $self->{_last_sref} = refaddr($self->{input_data});
965   }
966
967   $self->shutdown() if ($_requires_shutdown);
968
969   ## -------------------------------------------------------------------------
970
971   $self->{_wrk_status} = 0;
972
973   ## Spawn workers.
974   $self->spawn() unless ($self->{_spawned});
975   return $self   unless ($self->{_total_workers});
976
977   local $SIG{__DIE__}  = \&MCE::Signal::_die_handler;
978   local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
979
980   $MCE = $self if ($MCE->{_wid} == 0);
981
982   my ($_input_data, $_input_file, $_input_glob, $_seq);
983   my ($_abort_msg, $_first_msg, $_run_mode, $_single_dim);
984   my $_chunk_size = $self->{chunk_size};
985
986   $_seq = ($_has_user_tasks && $self->{user_tasks}->[0]->{sequence})
987      ? $self->{user_tasks}->[0]->{sequence}
988      : $self->{sequence};
989
990   ## Determine run mode for workers.
991   if (defined $_seq) {
992      my ($_begin, $_end, $_step) = (ref $_seq eq 'ARRAY')
993         ? @{ $_seq } : ($_seq->{begin}, $_seq->{end}, $_seq->{step});
994
995      $_chunk_size = $self->{user_tasks}->[0]->{chunk_size}
996         if ($_has_user_tasks && $self->{user_tasks}->[0]->{chunk_size});
997
998      $_run_mode  = 'sequence';
999      $_abort_msg = int(($_end - $_begin) / $_step / $_chunk_size); # + 1;
1000
1001      # Previously + 1 above. Below, support for large numbers, 1e16 and beyond.
1002      # E.g. sequence => [ 1, 1e16 ], chunk_size => 1e11
1003      #
1004      # Perl: int((1e15 - 1) / 1 / 1e11) =   9999
1005      # Perl: int((1e16 - 1) / 1 / 1e11) = 100000 wrong, due to precision limit
1006      # Calc: int((1e16 - 1) / 1 / 1e11) =  99999
1007
1008      if ( $_step > 0 ) {
1009         $_abort_msg++
1010            if ($_abort_msg * $_chunk_size * abs($_step) + $_begin <= $_end);
1011      } else {
1012         $_abort_msg++
1013            if ($_abort_msg * $_chunk_size * abs($_step) + $_end <= $_begin);
1014      }
1015
1016      $_first_msg = 0;
1017   }
1018   elsif (defined $self->{input_data}) {
1019      my $_ref = ref $self->{input_data};
1020
1021      if ($_ref eq '') {                              # File mode
1022         $_run_mode   = 'file';
1023         $_input_file = $self->{input_data};
1024         $_input_data = $_input_glob = undef;
1025         $_abort_msg  = (-s $_input_file) + 1;
1026         $_first_msg  = 0; ## Begin at offset position
1027
1028         if ((-s $_input_file) == 0) {
1029            $self->shutdown() if ($_auto_shutdown == 1);
1030            return $self;
1031         }
1032      }
1033      elsif ($_ref eq 'ARRAY') {                      # Array mode
1034         $_run_mode   = 'array';
1035         $_input_data = $self->{input_data};
1036         $_input_file = $_input_glob = undef;
1037         $_single_dim = 1 if (ref $_input_data->[0] eq '');
1038         $_abort_msg  = 0; ## Flag: Has Data: No
1039         $_first_msg  = 1; ## Flag: Has Data: Yes
1040
1041         if (@{ $_input_data } == 0) {
1042            $self->shutdown() if ($_auto_shutdown == 1);
1043            return $self;
1044         }
1045      }
1046      elsif ($_ref eq 'HASH') {                       # Hash mode
1047         $_run_mode   = 'hash';
1048         $_input_data = $self->{input_data};
1049         $_input_file = $_input_glob = undef;
1050         $_abort_msg  = 0; ## Flag: Has Data: No
1051         $_first_msg  = 1; ## Flag: Has Data: Yes
1052
1053         if (scalar( keys %{ $_input_data } ) == 0) {
1054            $self->shutdown() if ($_auto_shutdown == 1);
1055            return $self;
1056         }
1057      }
1058      elsif ($_ref =~ /^(?:GLOB|FileHandle|IO::)/) {  # Glob mode
1059         $_run_mode   = 'glob';
1060         $_input_glob = $self->{input_data};
1061         $_input_data = $_input_file = undef;
1062         $_abort_msg  = 0; ## Flag: Has Data: No
1063         $_first_msg  = 1; ## Flag: Has Data: Yes
1064      }
1065      elsif ($_ref eq 'CODE') {                       # Iterator mode
1066         $_run_mode   = 'iterator';
1067         $_input_data = $self->{input_data};
1068         $_input_file = $_input_glob = undef;
1069         $_abort_msg  = 0; ## Flag: Has Data: No
1070         $_first_msg  = 1; ## Flag: Has Data: Yes
1071      }
1072      elsif ($_ref eq 'SCALAR') {                     # Memory mode
1073         $_run_mode   = 'memory';
1074         $_input_data = $_input_file = $_input_glob = undef;
1075         $_abort_msg  = length(${ $self->{input_data} }) + 1;
1076         $_first_msg  = 0; ## Begin at offset position
1077
1078         if (length(${ $self->{input_data} }) == 0) {
1079            return $self->shutdown() if ($_auto_shutdown == 1);
1080         }
1081      }
1082      else {
1083         _croak('MCE::run: (input_data) is not valid');
1084      }
1085   }
1086   else {                                             # Nodata mode
1087      $_abort_msg = undef, $_run_mode = 'nodata';
1088   }
1089
1090   ## -------------------------------------------------------------------------
1091
1092   my $_total_workers = $self->{_total_workers};
1093   my $_send_cnt      = $self->{_send_cnt};
1094
1095   if ($_send_cnt) {
1096      $self->{_total_running} = $_send_cnt;
1097      $self->{_task}->[0]->{_total_running} = $_send_cnt;
1098   }
1099   else {
1100      $self->{_total_running} = $_total_workers;
1101
1102      my ($_frozen_nodata, $_wid, %_params_nodata, %_task0_wids);
1103      my  $_COM_R_SOCK   = $self->{_com_r_sock};
1104      my  $_submit_delay = $self->{submit_delay};
1105
1106      my %_params = (
1107         '_abort_msg'   => $_abort_msg,  '_chunk_size' => $_chunk_size,
1108         '_input_file'  => $_input_file, '_run_mode'   => $_run_mode,
1109         '_bounds_only' => $self->{bounds_only},
1110         '_max_retries' => $self->{max_retries},
1111         '_parallel_io' => $self->{parallel_io},
1112         '_progress'    => $self->{progress} ? 1 : 0,
1113         '_sequence'    => $self->{sequence},
1114         '_user_args'   => $self->{user_args},
1115         '_use_slurpio' => $self->{use_slurpio},
1116         '_RS'          => $self->{RS}
1117      );
1118
1119      my $_frozen_params = $self->{freeze}(\%_params);
1120         $_frozen_params = length($_frozen_params).$LF . $_frozen_params;
1121
1122      if ($_has_user_tasks) {
1123         %_params_nodata = ( %_params,
1124            '_abort_msg' => undef, '_run_mode' => 'nodata'
1125         );
1126         $_frozen_nodata = $self->{freeze}(\%_params_nodata);
1127         $_frozen_nodata = length($_frozen_nodata).$LF . $_frozen_nodata;
1128
1129         for my $_t (@{ $self->{_task} }) {
1130            $_t->{_total_running} = $_t->{_total_workers};
1131         }
1132         for my $_i (1 .. @{ $self->{_state} } - 1) {
1133            $_task0_wids{$_i} = undef unless ($self->{_state}[$_i]{_task_id});
1134         }
1135      }
1136
1137      local $\ = undef; local $/ = $LF;
1138
1139      ## Insert the first message into the queue if defined.
1140      if (defined $_first_msg) {
1141         syswrite($self->{_que_w_sock}, pack($_que_template, 0, $_first_msg));
1142      }
1143
1144      ## Submit params data to workers.
1145      for my $_i (1 .. $_total_workers) {
1146         print({$_COM_R_SOCK} $_i.$LF), chomp($_wid = <$_COM_R_SOCK>);
1147
1148         if (!$_has_user_tasks || exists $_task0_wids{$_wid}) {
1149            print({$_COM_R_SOCK} $_frozen_params), <$_COM_R_SOCK>;
1150            $self->{_state}[$_wid]{_params} = \%_params;
1151         } else {
1152            print({$_COM_R_SOCK} $_frozen_nodata), <$_COM_R_SOCK>;
1153            $self->{_state}[$_wid]{_params} = \%_params_nodata;
1154         }
1155
1156         sleep $_submit_delay
1157            if defined($_submit_delay) && $_submit_delay > 0.0;
1158      }
1159   }
1160
1161   ## -------------------------------------------------------------------------
1162
1163   $self->{_total_exited} = 0;
1164
1165   ## Call the output function.
1166   if ($self->{_total_running} > 0) {
1167      $self->{_mgr_live}   = 1;
1168      $self->{_abort_msg}  = $_abort_msg;
1169      $self->{_single_dim} = $_single_dim;
1170
1171      lock $self->{_run_lock} if $_is_MSWin32;
1172
1173      if (!$_send_cnt) {
1174         ## Notify workers to commence processing.
1175         if ($_is_MSWin32) {
1176            my $_buf = _sprintf("%${_total_workers}s", "");
1177            syswrite($self->{_bsb_r_sock}, $_buf);
1178         } else {
1179            my $_BSB_R_SOCK = $self->{_bsb_r_sock};
1180            for my $_i (1 .. $_total_workers) {
1181               syswrite($_BSB_R_SOCK, $LF);
1182            }
1183         }
1184      }
1185
1186      _output_loop( $self, $_input_data, $_input_glob,
1187         \%_plugin_function, \@_plugin_loop_begin, \@_plugin_loop_end
1188      );
1189
1190      $self->{_mgr_live} = $self->{_abort_msg} = $self->{_single_dim} = undef;
1191   }
1192
1193   ## Remove the last message from the queue.
1194   if (!$_send_cnt && $_run_mode ne 'nodata') {
1195      MCE::Util::_sysread($self->{_que_r_sock}, my($_buf), $_que_read_size)
1196         if ( defined $self->{_que_r_sock} );
1197   }
1198
1199   $self->{_send_cnt} = 0;
1200
1201   ## Shutdown workers.
1202   if ($_auto_shutdown || $self->{_total_exited}) {
1203      $self->shutdown();
1204   }
1205   elsif ($^S || $ENV{'PERL_IPERL_RUNNING'}) {
1206      if (
1207         !$INC{'Mojo/IOLoop.pm'} && !$INC{'Win32/GUI.pm'} &&
1208         !$INC{'Gearman/XS.pm'} && !$INC{'Gearman/Util.pm'} &&
1209         !$INC{'Tk.pm'} && !$INC{'Wx.pm'}
1210      ) {
1211         # running inside eval or IPerl, check stack trace
1212         my $_t = Carp::longmess(); $_t =~ s/\teval [^\n]+\n$//;
1213
1214         if ( $_t =~ /^(?:[^\n]+\n){1,7}\teval / ||
1215              $_t =~ /\n\teval [^\n]+\n\t(?:eval|Try)/ ||
1216              $_t =~ /\n\tMCE::_dispatch\(\) [^\n]+ thread \d+\n$/ ||
1217              ( $_tid && !$self->{use_threads} ) )
1218         {
1219            $self->shutdown();
1220         }
1221      }
1222   }
1223
1224   return $self;
1225}
1226
1227###############################################################################
1228## ----------------------------------------------------------------------------
1229## Send method.
1230##
1231###############################################################################
1232
1233sub send {
1234   my $self = shift; $self = $MCE unless ref($self);
1235
1236   _croak('MCE::send: method is not allowed by the worker process')
1237      if ($self->{_wid});
1238   _croak('MCE::send: method is not allowed while running')
1239      if ($self->{_total_running});
1240
1241   _croak('MCE::send: method cannot be used with input_data or sequence')
1242      if (defined $self->{input_data} || defined $self->{sequence});
1243   _croak('MCE::send: method cannot be used with user_tasks')
1244      if (defined $self->{user_tasks});
1245
1246   my $_data_ref;
1247
1248   if (ref $_[0] eq 'ARRAY' || ref $_[0] eq 'HASH' || ref $_[0] eq 'PDL') {
1249      $_data_ref = $_[0];
1250   } else {
1251      _croak('MCE::send: ARRAY, HASH, or a PDL reference is not specified');
1252   }
1253
1254   @_ = ();
1255
1256   $self->{_send_cnt} = 0 unless (defined $self->{_send_cnt});
1257
1258   ## -------------------------------------------------------------------------
1259
1260   ## Spawn workers.
1261   $self->spawn() unless ($self->{_spawned});
1262
1263   _croak('MCE::send: Sending greater than # of workers is not allowed')
1264      if ($self->{_send_cnt} >= $self->{_task}->[0]->{_total_workers});
1265
1266   local $SIG{__DIE__}  = \&MCE::Signal::_die_handler;
1267   local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
1268
1269   ## Begin data submission.
1270   local $\ = undef; local $/ = $LF;
1271
1272   my $_COM_R_SOCK   = $self->{_com_r_sock};
1273   my $_submit_delay = $self->{submit_delay};
1274   my $_frozen_data  = $self->{freeze}($_data_ref);
1275   my $_len          = length $_frozen_data;
1276
1277   ## Submit data to worker.
1278   print({$_COM_R_SOCK} '_data'.$LF), <$_COM_R_SOCK>;
1279   print({$_COM_R_SOCK} $_len.$LF, $_frozen_data), <$_COM_R_SOCK>;
1280
1281   $self->{_send_cnt} += 1;
1282
1283   sleep $_submit_delay
1284      if defined($_submit_delay) && $_submit_delay > 0.0;
1285
1286   return $self;
1287}
1288
1289###############################################################################
1290## ----------------------------------------------------------------------------
1291## Shutdown method.
1292##
1293###############################################################################
1294
1295sub shutdown {
1296   my $self = shift; $self = $MCE unless ref($self);
1297   my $_no_lock = shift || 0;
1298
1299   @_ = ();
1300
1301   ## Return unless spawned or already shutdown.
1302   return unless $self->{_spawned};
1303
1304   ## Return if signaled.
1305   if ($MCE::Signal::KILLED) {
1306      if (defined $self->{_sess_dir}) {
1307         my $_sess_dir = delete $self->{_sess_dir};
1308         rmdir $_sess_dir if -d $_sess_dir;
1309      }
1310      return;
1311   }
1312
1313   _validate_runstate($self, 'MCE::shutdown');
1314
1315   ## Complete processing before shutting down.
1316   $self->run(0) if ($self->{_send_cnt});
1317
1318   local $SIG{__DIE__}  = \&MCE::Signal::_die_handler;
1319   local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
1320
1321   my $_COM_R_SOCK     = $self->{_com_r_sock};
1322   my $_data_channels  = $self->{_data_channels};
1323   my $_total_workers  = $self->{_total_workers};
1324   my $_sess_dir       = $self->{_sess_dir};
1325
1326   if (defined $TOP_HDLR && refaddr($self) == refaddr($TOP_HDLR)) {
1327      $TOP_HDLR = undef;
1328   }
1329
1330   ## -------------------------------------------------------------------------
1331
1332   lock $_MCE_LOCK if ($_has_threads && $_is_winenv && !$_no_lock);
1333
1334   ## Notify workers to exit loop.
1335   local ($!, $?, $_); local $\ = undef; local $/ = $LF;
1336
1337   for (1 .. $_total_workers) {
1338      print({$_COM_R_SOCK} '_exit'.$LF), <$_COM_R_SOCK>;
1339   }
1340
1341   ## Reap children and/or threads.
1342   if (@{ $self->{_pids} } > 0) {
1343      my $_list = $self->{_pids};
1344      for my $i (0 .. @{ $_list }) {
1345         waitpid($_list->[$i], 0) if $_list->[$i];
1346      }
1347   }
1348   if (@{ $self->{_thrs} } > 0) {
1349      my $_list = $self->{_thrs};
1350      for my $i (0 .. @{ $_list }) {
1351         $_list->[$i]->join() if $_list->[$i];
1352      }
1353   }
1354
1355   ## Close sockets.
1356   $_COM_R_SOCK = undef;
1357
1358   MCE::Util::_destroy_socks($self, qw(
1359      _bsb_w_sock _bsb_r_sock _com_w_sock _com_r_sock
1360      _dat_w_sock _dat_r_sock _rla_w_sock _rla_r_sock
1361   ));
1362
1363   ($_is_MSWin32)
1364      ? MCE::Util::_destroy_pipes($self, qw( _que_w_sock _que_r_sock ))
1365      : MCE::Util::_destroy_socks($self, qw( _que_w_sock _que_r_sock ));
1366
1367   ## -------------------------------------------------------------------------
1368
1369   ## Destroy mutexes.
1370   for my $_i (0 .. $_data_channels) { delete $self->{'_mutex_'.$_i}; }
1371
1372   ## Remove session directory.
1373   rmdir $_sess_dir if (defined $_sess_dir && -d $_sess_dir);
1374
1375   ## Reset instance.
1376   undef @{$self->{_pids}};  undef @{$self->{_thrs}};   undef @{$self->{_tids}};
1377   undef @{$self->{_state}}; undef @{$self->{_status}}; undef @{$self->{_task}};
1378
1379   $self->{_chunk_id} = $self->{_send_cnt} = $self->{_spawned} = 0;
1380   $self->{_total_running} = $self->{_total_exited} = 0;
1381   $self->{_total_workers} = 0;
1382   $self->{_sess_dir} = undef;
1383
1384   if ($self->{loop_timeout}) {
1385      delete $self->{_pids_t};
1386      delete $self->{_pids_w};
1387   }
1388
1389   return;
1390}
1391
1392###############################################################################
1393## ----------------------------------------------------------------------------
1394## Barrier sync and yield methods.
1395##
1396###############################################################################
1397
1398sub sync {
1399   my $self = shift; $self = $MCE unless ref($self);
1400
1401   return unless ($self->{_wid});
1402
1403   ## Barrier synchronization is supported for task 0 at this time.
1404   ## Note: Workers are assigned task_id 0 when omitting user_tasks.
1405
1406   return if ($self->{_task_id} > 0);
1407
1408   my $_chn        = $self->{_chn};
1409   my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1410   my $_BSB_R_SOCK = $self->{_bsb_r_sock};
1411   my $_BSB_W_SOCK = $self->{_bsb_w_sock};
1412   my $_buf;
1413
1414   local $\ = undef if (defined $\);
1415
1416   ## Notify the manager process (barrier begin).
1417   print {$_DAT_W_SOCK} OUTPUT_B_SYN.$LF . $_chn.$LF;
1418
1419   ## Wait until all workers from (task_id 0) have synced.
1420   MCE::Util::_sock_ready($_BSB_R_SOCK, -1) if $_is_MSWin32;
1421   MCE::Util::_sysread($_BSB_R_SOCK, $_buf, 1);
1422
1423   ## Notify the manager process (barrier end).
1424   print {$_DAT_W_SOCK} OUTPUT_E_SYN.$LF . $_chn.$LF;
1425
1426   ## Wait until all workers from (task_id 0) have un-synced.
1427   MCE::Util::_sock_ready($_BSB_W_SOCK, -1) if $_is_MSWin32;
1428   MCE::Util::_sysread($_BSB_W_SOCK, $_buf, 1);
1429
1430   return;
1431}
1432
1433sub yield {
1434   my $self = shift; $self = $MCE unless ref($self);
1435
1436   return unless ($self->{_wid});
1437
1438   my $_chn        = $self->{_chn};
1439   my $_DAT_LOCK   = $self->{_dat_lock};
1440   my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1441   my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1442   my $_lock_chn   = $self->{_lock_chn};
1443   my $_delay;
1444
1445   local $\ = undef if (defined $\);
1446   local $/ = $LF if (!$/ || $/ ne $LF);
1447
1448   $_DAT_LOCK->lock() if $_lock_chn;
1449   print({$_DAT_W_SOCK} OUTPUT_I_DLY.$LF . $_chn.$LF),
1450   print({$_DAU_W_SOCK} $self->{_task_id}.$LF);
1451   chomp($_delay = <$_DAU_W_SOCK>);
1452   $_DAT_LOCK->unlock() if $_lock_chn;
1453
1454   MCE::Util::_sleep( $_delay );
1455}
1456
1457###############################################################################
1458## ----------------------------------------------------------------------------
1459## Miscellaneous methods: abort exit sess_dir tmp_dir.
1460##
1461###############################################################################
1462
1463## Abort current job.
1464
1465sub abort {
1466   my $self = shift; $self = $MCE unless ref($self);
1467
1468   my $_QUE_R_SOCK = $self->{_que_r_sock};
1469   my $_QUE_W_SOCK = $self->{_que_w_sock};
1470   my $_abort_msg  = $self->{_abort_msg};
1471
1472   if (defined $_abort_msg) {
1473      local $\ = undef;
1474
1475      if ($_abort_msg > 0) {
1476         MCE::Util::_sysread($_QUE_R_SOCK, my($_next), $_que_read_size);
1477         syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg));
1478      }
1479
1480      if ($self->{_wid} > 0) {
1481         my $_chn        = $self->{_chn};
1482         my $_DAT_LOCK   = $self->{_dat_lock};
1483         my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1484         my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1485         my $_lock_chn   = $self->{_lock_chn};
1486
1487         $_DAT_LOCK->lock() if $_lock_chn;
1488         print {$_DAT_W_SOCK} OUTPUT_W_ABT.$LF . $_chn.$LF;
1489         $_DAT_LOCK->unlock() if $_lock_chn;
1490      }
1491   }
1492
1493   return;
1494}
1495
1496## Worker exits from MCE.
1497
1498sub exit {
1499   my $self = shift; $self = $MCE unless ref($self);
1500
1501   my $_exit_status = (defined $_[0]) ? $_[0] : $?;
1502   my $_exit_msg    = (defined $_[1]) ? $_[1] : '';
1503   my $_exit_id     = (defined $_[2]) ? $_[2] : $self->chunk_id;
1504
1505   @_ = ();
1506
1507   _croak('MCE::exit: method is not allowed by the manager process')
1508      unless ($self->{_wid});
1509
1510   my $_chn        = $self->{_chn};
1511   my $_DAT_LOCK   = $self->{_dat_lock};
1512   my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1513   my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1514   my $_lock_chn   = $self->{_lock_chn};
1515   my $_task_id    = $self->{_task_id};
1516
1517   unless ( $self->{_exiting} ) {
1518      $self->{_exiting} = 1;
1519
1520      my $_pid = $self->{_is_thread} ? $$ .'.'. threads->tid() : $$;
1521      my $_max_retries = $self->{max_retries};
1522      my $_chunk_id = $self->{_chunk_id};
1523
1524      if ( defined $self->{init_relay} && !$self->{_relayed} && !$_task_id &&
1525           exists $self->{_wuf} && $self->{_pid} eq $_pid ) {
1526
1527         $self->{_retry_cnt} = -1 unless defined( $self->{_retry_cnt} );
1528
1529         if ( !$_max_retries || ++$self->{_retry_cnt} == $_max_retries ) {
1530            MCE::relay { warn "Error: chunk $_chunk_id failed\n" if $_chunk_id };
1531         }
1532      }
1533
1534      ## Check for nested workers not yet joined.
1535      MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
1536
1537      MCE::Hobo->finish('MCE')
1538         if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') );
1539
1540      local $\ = undef if (defined $\);
1541      my $_len = length $_exit_msg;
1542
1543      $_exit_id =~ s/[\r\n][\r\n]*/ /mg;
1544      $_DAT_LOCK->lock() if $_lock_chn;
1545
1546      if ($self->{_retry} && $self->{_retry}->[2]--) {
1547         $_exit_status = 0; my $_buf = $self->{freeze}($self->{_retry});
1548         print({$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF),
1549         print({$_DAU_W_SOCK}
1550            $_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF .
1551            $_exit_status.$LF . $_exit_id.$LF . $_len.$LF . $_exit_msg .
1552            length($_buf).$LF, $_buf
1553         );
1554      }
1555      else {
1556         print({$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF),
1557         print({$_DAU_W_SOCK}
1558            $_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF .
1559            $_exit_status.$LF . $_exit_id.$LF . $_len.$LF . $_exit_msg .
1560            '0'.$LF
1561         );
1562      }
1563
1564      $_DAT_LOCK->unlock() if $_lock_chn;
1565   }
1566
1567   _exit($self);
1568}
1569
1570## Return the session dir, made on demand.
1571
1572sub sess_dir {
1573   my $self = shift; $self = $MCE unless ref($self);
1574   return $self->{_sess_dir} if defined $self->{_sess_dir};
1575
1576   if ($self->{_wid} == 0) {
1577      $self->{_sess_dir} = $self->{_spawned}
1578         ? _make_sessdir($self) : undef;
1579   }
1580   else {
1581      my $_chn        = $self->{_chn};
1582      my $_DAT_LOCK   = $self->{_dat_lock};
1583      my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1584      my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1585      my $_lock_chn   = $self->{_lock_chn};
1586      my $_sess_dir;
1587
1588      local $\ = undef if (defined $\);
1589      local $/ = $LF if (!$/ || $/ ne $LF);
1590
1591      $_DAT_LOCK->lock() if $_lock_chn;
1592      print({$_DAT_W_SOCK} OUTPUT_S_DIR.$LF . $_chn.$LF);
1593      chomp($_sess_dir = <$_DAU_W_SOCK>);
1594      $_DAT_LOCK->unlock() if $_lock_chn;
1595
1596      $self->{_sess_dir} = $_sess_dir;
1597   }
1598}
1599
1600## Return the temp dir, made on demand.
1601
1602sub tmp_dir {
1603   my $self = shift; $self = $MCE unless ref($self);
1604   return $self->{tmp_dir} if defined $self->{tmp_dir};
1605
1606   if ($self->{_wid} == 0) {
1607      $self->{tmp_dir} = MCE::Signal::_make_tmpdir();
1608   }
1609   else {
1610      my $_chn        = $self->{_chn};
1611      my $_DAT_LOCK   = $self->{_dat_lock};
1612      my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1613      my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1614      my $_lock_chn   = $self->{_lock_chn};
1615      my $_tmp_dir;
1616
1617      local $\ = undef if (defined $\);
1618      local $/ = $LF if (!$/ || $/ ne $LF);
1619
1620      $_DAT_LOCK->lock() if $_lock_chn;
1621      print({$_DAT_W_SOCK} OUTPUT_T_DIR.$LF . $_chn.$LF);
1622      chomp($_tmp_dir = <$_DAU_W_SOCK>);
1623      $_DAT_LOCK->unlock() if $_lock_chn;
1624
1625      $self->{tmp_dir} = $_tmp_dir;
1626   }
1627}
1628
1629###############################################################################
1630## ----------------------------------------------------------------------------
1631## Methods for serializing data from workers to the main process.
1632##
1633###############################################################################
1634
1635## Do method. Additional arguments are optional.
1636
1637sub do {
1638   my $self = shift; $self = $MCE unless ref($self);
1639   my $_pkg = caller() eq 'MCE' ? caller(1) : caller();
1640
1641   _croak('MCE::do: (code ref) is not supported')
1642      if (ref $_[0] eq 'CODE');
1643   _croak('MCE::do: (callback) is not specified')
1644      unless (defined ( my $_func = shift ));
1645
1646   $_func = $_pkg.'::'.$_func if (index($_func, ':') < 0);
1647
1648   if ($self->{_wid}) {
1649      return _do_callback($self, $_func, [ @_ ]);
1650   }
1651   else {
1652      no strict 'refs';
1653      return $_func->(@_);
1654   }
1655}
1656
1657## Gather method.
1658
1659sub gather {
1660   my $self = shift; $self = $MCE unless ref($self);
1661
1662   _croak('MCE::gather: method is not allowed by the manager process')
1663      unless ($self->{_wid});
1664
1665   return _do_gather($self, [ @_ ]);
1666}
1667
1668## Sendto method.
1669
1670{
1671   my %_sendto_lkup = (
1672      'file'  => SENDTO_FILEV1, 'stderr' => SENDTO_STDERR,
1673      'file:' => SENDTO_FILEV2, 'stdout' => SENDTO_STDOUT,
1674      'fd:'   => SENDTO_FD,
1675   );
1676
1677   my $_v2_regx = qr/^([^:]+:)(.+)/;
1678
1679   sub sendto {
1680
1681      my $self = shift; $self = $MCE unless ref($self);
1682      my $_to  = shift;
1683
1684      _croak('MCE::sendto: method is not allowed by the manager process')
1685         unless ($self->{_wid});
1686
1687      return unless (defined $_[0]);
1688
1689      my $_dest = exists $_sendto_lkup{ lc($_to) }
1690                       ? $_sendto_lkup{ lc($_to) } : undef;
1691      my $_value;
1692
1693      if (!defined $_dest) {
1694         my $_fd;
1695
1696         if (ref($_to) && ( defined ($_fd = fileno($_to)) ||
1697                            defined ($_fd = eval { $_to->fileno }) )) {
1698
1699            if (my $_ob = tied *{ $_to }) {
1700               if (ref $_ob eq 'IO::TieCombine::Handle') {
1701                  $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1702                  $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1703               }
1704            }
1705
1706            my $_data_ref = (scalar @_ == 1) ? \(''.$_[0]) : \join('', @_);
1707            return _do_send_glob($self, $_to, $_fd, $_data_ref);
1708         }
1709         elsif (reftype($_to) eq 'GLOB') {
1710            return _croak('Cannot write to filehandle');
1711         }
1712
1713         if (defined $_to && $_to =~ /$_v2_regx/o) {
1714            $_dest  = exists $_sendto_lkup{ lc($1) }
1715                           ? $_sendto_lkup{ lc($1) } : undef;
1716            $_value = $2;
1717         }
1718
1719         if (!defined $_dest || ( !defined $_value && (
1720               $_dest == SENDTO_FILEV2 || $_dest == SENDTO_FD
1721         ))) {
1722            my $_msg  = "\n";
1723               $_msg .= "MCE::sendto: improper use of method\n";
1724               $_msg .= "\n";
1725               $_msg .= "## usage:\n";
1726               $_msg .= "##    ->sendto(\"stderr\", ...);\n";
1727               $_msg .= "##    ->sendto(\"stdout\", ...);\n";
1728               $_msg .= "##    ->sendto(\"file:/path/to/file\", ...);\n";
1729               $_msg .= "##    ->sendto(\"fd:2\", ...);\n";
1730               $_msg .= "\n";
1731
1732            _croak($_msg);
1733         }
1734      }
1735
1736      if ($_dest == SENDTO_FILEV1) {            # sendto 'file', $a, $path
1737         return if (!defined $_[1] || @_ > 2);  # Please switch to using V2
1738         $_value = $_[1]; delete $_[1];         # sendto 'file:/path', $a
1739         $_dest  = SENDTO_FILEV2;
1740      }
1741
1742      return _do_send($self, $_dest, $_value, @_);
1743   }
1744}
1745
1746###############################################################################
1747## ----------------------------------------------------------------------------
1748## Functions for serializing print, printf and say statements.
1749##
1750###############################################################################
1751
1752sub print {
1753   my $self = shift; $self = $MCE unless ref($self);
1754   my ($_fd, $_glob, $_data);
1755
1756   if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
1757                       defined ($_fd = eval { $_[0]->fileno }) )) {
1758
1759      if (my $_ob = tied *{ $_[0] }) {
1760         if (ref $_ob eq 'IO::TieCombine::Handle') {
1761            $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1762            $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1763         }
1764      }
1765
1766      $_glob = shift;
1767   }
1768   elsif (reftype($_[0]) eq 'GLOB') {
1769      return _croak('Cannot write to filehandle');
1770   }
1771
1772   $_data = join('', scalar @_ ? @_ : $_);
1773
1774   return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1775   return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1776   return _do_send_glob($self, \*STDOUT, 1, \$_data);
1777}
1778
1779sub printf {
1780   my $self = shift; $self = $MCE unless ref($self);
1781   my ($_fd, $_glob, $_fmt, $_data);
1782
1783   if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
1784                       defined ($_fd = eval { $_[0]->fileno }) )) {
1785
1786      if (my $_ob = tied *{ $_[0] }) {
1787         if (ref $_ob eq 'IO::TieCombine::Handle') {
1788            $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1789            $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1790         }
1791      }
1792
1793      $_glob = shift;
1794   }
1795   elsif (reftype($_[0]) eq 'GLOB') {
1796      return _croak('Cannot write to filehandle');
1797   }
1798
1799   $_fmt  = shift || '%s';
1800   $_data = sprintf($_fmt, scalar @_ ? @_ : $_);
1801
1802   return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1803   return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1804   return _do_send_glob($self, \*STDOUT, 1, \$_data);
1805}
1806
1807sub say {
1808   my $self = shift; $self = $MCE unless ref($self);
1809   my ($_fd, $_glob, $_data);
1810
1811   if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
1812                       defined ($_fd = eval { $_[0]->fileno }) )) {
1813
1814      if (my $_ob = tied *{ $_[0] }) {
1815         if (ref $_ob eq 'IO::TieCombine::Handle') {
1816            $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1817            $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1818         }
1819      }
1820
1821      $_glob = shift;
1822   }
1823   elsif (reftype($_[0]) eq 'GLOB') {
1824      return _croak('Cannot write to filehandle');
1825   }
1826
1827   $_data = join('', scalar @_ ? @_ : $_) . "\n";
1828
1829   return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1830   return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1831   return _do_send_glob($self, \*STDOUT, 1, \$_data);
1832}
1833
1834###############################################################################
1835## ----------------------------------------------------------------------------
1836## Private methods.
1837##
1838###############################################################################
1839
1840sub _exit {
1841   my $self = shift;
1842
1843   delete $self->{_wuf}; _end();
1844
1845   ## Exit thread/child process.
1846   $SIG{__DIE__}  = sub {} unless $_tid;
1847   $SIG{__WARN__} = sub {};
1848
1849   threads->exit(0) if $self->{use_threads};
1850
1851   if (! $_tid) {
1852      $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub {
1853         $SIG{$_[0]} = $SIG{INT} = $SIG{TERM} = sub {};
1854
1855         CORE::kill($_[0], getppid())
1856            if (($_[0] eq 'INT' || $_[0] eq 'TERM') && $^O ne 'MSWin32');
1857
1858         CORE::kill('KILL', $$);
1859      };
1860   }
1861
1862   if ($self->{posix_exit} && !$_is_MSWin32) {
1863      eval { MCE::Mutex::Channel::_destroy() };
1864      POSIX::_exit(0) if $INC{'POSIX.pm'};
1865      CORE::kill('KILL', $$);
1866   }
1867
1868   CORE::exit(0);
1869}
1870
1871sub _get_max_workers {
1872   my $self = shift; $self = $MCE unless ref($self);
1873
1874   if (defined $self->{user_tasks}) {
1875      if (defined $self->{user_tasks}->[0]->{max_workers}) {
1876         return $self->{user_tasks}->[0]->{max_workers};
1877      }
1878   }
1879
1880   return $self->{max_workers};
1881}
1882
1883sub _make_sessdir {
1884   my $self = shift; $self = $MCE unless ref($self);
1885
1886   my $_sess_dir = $self->{_sess_dir};
1887
1888   unless (defined $_sess_dir) {
1889      $self->{tmp_dir} = MCE::Signal::_make_tmpdir()
1890         unless defined $self->{tmp_dir};
1891
1892      my $_mce_tid = $INC{'threads.pm'} ? threads->tid() : '';
1893         $_mce_tid = '' unless defined $self->{_mce_tid};
1894
1895      my $_mce_sid = $$ .'.'. $_mce_tid .'.'. (++$_mce_count);
1896      my $_tmp_dir = $self->{tmp_dir};
1897
1898      _croak("MCE::sess_dir: (tmp_dir) is not defined")
1899         if (!defined $_tmp_dir || $_tmp_dir eq '');
1900      _croak("MCE::sess_dir: ($_tmp_dir) is not a directory or does not exist")
1901         unless (-d $_tmp_dir);
1902      _croak("MCE::sess_dir: ($_tmp_dir) is not writeable")
1903         unless (-w $_tmp_dir);
1904
1905      my $_cnt = 0; $_sess_dir = "$_tmp_dir/$_mce_sid";
1906
1907      $_sess_dir = "$_tmp_dir/$_mce_sid." . (++$_cnt)
1908         while ( !(mkdir $_sess_dir, 0770) );
1909   }
1910
1911   return $_sess_dir;
1912}
1913
1914sub _sprintf {
1915   my ($_fmt, $_arg) = @_;
1916   # remove tainted'ness
1917   ($_fmt) = $_fmt =~ /(.*)/;
1918
1919   return sprintf("$_fmt", $_arg);
1920}
1921
1922sub _sync_buffer_to_array {
1923   my ($_buffer_ref, $_array_ref, $_chop_str) = @_;
1924
1925   local $_; my $_cnt = 0;
1926
1927   open my $_MEM_FH, '<', $_buffer_ref;
1928   binmode $_MEM_FH, ':raw';
1929
1930   unless (length $_chop_str) {
1931      $_array_ref->[$_cnt++] = $_ while (<$_MEM_FH>);
1932   }
1933   else {
1934      $_array_ref->[$_cnt++] = <$_MEM_FH>;
1935      while (<$_MEM_FH>) {
1936         $_array_ref->[$_cnt  ]  = $_chop_str;
1937         $_array_ref->[$_cnt++] .= $_;
1938      }
1939   }
1940
1941   close  $_MEM_FH;
1942   weaken $_MEM_FH;
1943
1944   return;
1945}
1946
1947sub _sync_params {
1948   my ($self, $_params_ref) = @_;
1949   my $_requires_shutdown = 0;
1950
1951   if (defined $_params_ref->{init_relay} && !defined $self->{init_relay}) {
1952      $_requires_shutdown = 1;
1953   }
1954   for my $_p (qw( user_begin user_func user_end )) {
1955      if (defined $_params_ref->{$_p}) {
1956         $self->{$_p} = delete $_params_ref->{$_p};
1957         $_requires_shutdown = 1;
1958      }
1959   }
1960   for my $_p (keys %{ $_params_ref }) {
1961      _croak("MCE::_sync_params: ($_p) is not a valid params argument")
1962         unless (exists $_params_allowed_args{$_p});
1963
1964      $self->{$_p} = $_params_ref->{$_p};
1965   }
1966
1967   return ($self->{_spawned}) ? $_requires_shutdown : 0;
1968}
1969
1970###############################################################################
1971## ----------------------------------------------------------------------------
1972## Dispatch methods.
1973##
1974###############################################################################
1975
1976sub _dispatch {
1977   my @_args = @_; my $_is_thread = shift @_args;
1978   my $self = $MCE = $_args[0];
1979
1980   ## To avoid (Scalars leaked: N) messages; fixed in Perl 5.12.x
1981   @_ = ();
1982
1983   $ENV{'PERL_MCE_IPC'} = 'win32' if ( $_is_MSWin32 && (
1984      defined($self->{max_retries}) ||
1985      $INC{'MCE/Child.pm'} ||
1986      $INC{'MCE/Hobo.pm'}
1987   ));
1988
1989   delete $self->{_relayed};
1990
1991   $self->{_is_thread} = $_is_thread;
1992   $self->{_pid}       = $_is_thread ? $$ .'.'. threads->tid() : $$;
1993
1994   ## Sets the seed of the base generator uniquely between workers.
1995   ## The new seed is computed using the current seed and $_wid value.
1996   ## One may set the seed at the application level for predictable
1997   ## results (non-thread workers only). Ditto for Math::Prime::Util,
1998   ## Math::Random, and Math::Random::MT::Auto.
1999
2000   {
2001      my ($_wid, $_seed) = ($_args[1], $self->{_seed});
2002      srand(abs($_seed - ($_wid * 100000)) % 2147483560);
2003
2004      if (!$self->{use_threads}) {
2005         Math::Prime::Util::srand(abs($_seed - ($_wid * 100000)) % 2147483560)
2006            if ( $INC{'Math/Prime/Util.pm'} );
2007
2008         MCE::Hobo->_clear()
2009            if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') );
2010
2011         MCE::Child->_clear() if $INC{'MCE/Child.pm'};
2012      }
2013   }
2014
2015   if (!$self->{use_threads} && $INC{'Math/Random.pm'}) {
2016      my ($_wid, $_cur_seed) = ($_args[1], Math::Random::random_get_seed());
2017
2018      my $_new_seed = ($_cur_seed < 1073741781)
2019         ? $_cur_seed + (($_wid * 100000) % 1073741780)
2020         : $_cur_seed - (($_wid * 100000) % 1073741780);
2021
2022      Math::Random::random_set_seed($_new_seed, $_new_seed);
2023   }
2024
2025   if (!$self->{use_threads} && $INC{'Math/Random/MT/Auto.pm'}) {
2026      my ($_wid, $_cur_seed) = (
2027         $_args[1], Math::Random::MT::Auto::get_seed()->[0]
2028      );
2029      my $_new_seed = ($_cur_seed < 1073741781)
2030         ? $_cur_seed + (($_wid * 100000) % 1073741780)
2031         : $_cur_seed - (($_wid * 100000) % 1073741780);
2032
2033      Math::Random::MT::Auto::set_seed($_new_seed);
2034   }
2035
2036   ## Run.
2037
2038   _worker_main(@_args, \@_plugin_worker_init);
2039
2040   _exit($self);
2041}
2042
2043sub _dispatch_thread {
2044   my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_;
2045
2046   @_ = (); local $_;
2047
2048   my $_thr = threads->create( \&_dispatch,
2049      1, $self, $_wid, $_task, $_task_id, $_task_wid, $_params
2050   );
2051
2052   _croak("MCE::_dispatch_thread: Failed to spawn worker $_wid: $!")
2053      if (!defined $_thr);
2054
2055   ## Store into an available slot (restart), otherwise append to arrays.
2056   if (defined $_params) { for my $_i (0 .. @{ $self->{_tids} } - 1) {
2057      unless (defined $self->{_tids}->[$_i]) {
2058         $self->{_thrs}->[$_i] = $_thr;
2059         $self->{_tids}->[$_i] = $_thr->tid();
2060         return;
2061      }
2062   }}
2063
2064   push @{ $self->{_thrs} }, $_thr;
2065   push @{ $self->{_tids} }, $_thr->tid();
2066
2067   sleep $self->{spawn_delay}
2068      if defined($self->{spawn_delay}) && $self->{spawn_delay} > 0.0;
2069
2070   return;
2071}
2072
2073sub _dispatch_child {
2074   my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_;
2075
2076   @_ = (); local $_;
2077   my $_pid = fork();
2078
2079   _croak("MCE::_dispatch_child: Failed to spawn worker $_wid: $!")
2080      if (!defined $_pid);
2081
2082   _dispatch(0, $self, $_wid, $_task, $_task_id, $_task_wid, $_params)
2083      if ($_pid == 0);
2084
2085   ## Store into an available slot (restart), otherwise append to array.
2086   if (defined $_params) { for my $_i (0 .. @{ $self->{_pids} } - 1) {
2087      unless (defined $self->{_pids}->[$_i]) {
2088         $self->{_pids}->[$_i] = $_pid;
2089         return;
2090      }
2091   }}
2092
2093   push @{ $self->{_pids} }, $_pid;
2094
2095   if ($self->{loop_timeout} && !$_is_MSWin32) {
2096      $self->{_pids_t}{$_pid} = $_task_id;
2097      $self->{_pids_w}{$_pid} = $_wid;
2098   }
2099
2100   sleep $self->{spawn_delay}
2101      if defined($self->{spawn_delay}) && $self->{spawn_delay} > 0.0;
2102
2103   return;
2104}
2105
21061;
2107
2108