1###############################################################################
2## ----------------------------------------------------------------------------
3## Handle helper class.
4##
5###############################################################################
6
7package MCE::Shared::Handle;
8
9use strict;
10use warnings;
11
12use 5.010001;
13
14no warnings qw( threads recursion uninitialized numeric );
15
16our $VERSION = '1.874';
17
18## no critic (BuiltinFunctions::ProhibitStringyEval)
19## no critic (InputOutput::ProhibitTwoArgOpen)
20## no critic (Subroutines::ProhibitExplicitReturnUndef)
21## no critic (Subroutines::ProhibitSubroutinePrototypes)
22## no critic (TestingAndDebugging::ProhibitNoStrict)
23
24use MCE::Shared::Base ();
25
26my $LF = "\012"; Internals::SvREADONLY($LF, 1);
27my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
28my $_max_fd = eval 'fileno(\*main::DATA)' // 2;
29my $_reset_flg = 1;
30
31sub _croak {
32   goto &MCE::Shared::Base::_croak;
33}
34sub CLONE {
35   $_tid = threads->tid() if $INC{'threads.pm'};
36}
37
38sub import {
39   if (!defined $INC{'MCE/Shared.pm'}) {
40      no strict 'refs'; no warnings 'redefine';
41      *{ caller().'::mce_open' } = \&open;
42   }
43   return;
44}
45
46sub TIEHANDLE {
47   my $class = shift;
48
49   if (ref $_[0] eq 'ARRAY') {
50      # For use with MCE::Shared in order to reach the Server process.
51      # Therefore constructed without a GLOB handle initially.
52
53      MCE::Shared::Object::_reset(), $_reset_flg = ''
54         if $_reset_flg && $INC{'MCE/Shared/Server.pm'};
55
56      return bless $_[0], $class;
57   }
58
59   bless my $fh = \do { no warnings 'once'; local *FH }, $class;
60
61   if (@_) {
62      if ( !defined wantarray ) {
63         $fh->OPEN(@_) or _croak("open error: $!");
64      } else {
65         $fh->OPEN(@_) or return '';
66      }
67   }
68
69   $fh;
70}
71
72###############################################################################
73## ----------------------------------------------------------------------------
74## Based on Tie::StdHandle.
75##
76###############################################################################
77
78sub EOF     { eof($_[0]) }
79sub TELL    { tell($_[0]) }
80sub FILENO  { fileno($_[0]) }
81sub SEEK    { seek($_[0], $_[1], $_[2]) }
82sub CLOSE   { close($_[0]) if defined(fileno $_[0]) }
83sub BINMODE { binmode($_[0], $_[1] // ':raw') ? 1 : '' }
84sub GETC    { getc($_[0]) }
85
86sub OPEN {
87   my $ret;
88
89   close($_[0]) if defined fileno($_[0]);
90
91   if ( @_ == 3 && ref $_[2] && defined( my $_fd = fileno($_[2]) ) ) {
92      $ret = CORE::open($_[0], $_[1]."&=$_fd");
93   }
94   else {
95      $ret = ( @_ == 2 )
96         ? CORE::open($_[0], $_[1])
97         : CORE::open($_[0], $_[1], $_[2]);
98   }
99
100   # enable autoflush
101   select(( select($_[0]), $| = 1 )[0]) if $ret;
102
103   $ret;
104}
105
106sub open (@) {
107   shift if ( defined $_[0] && $_[0] eq 'MCE::Shared::Handle' );
108
109   my $item;
110
111   if ( ref $_[0] eq 'GLOB' && tied *{ $_[0] } &&
112        ref tied(*{ $_[0] }) eq __PACKAGE__ ) {
113      $item = tied *{ $_[0] };
114   }
115   elsif ( @_ ) {
116      if ( ref $_[0] eq 'GLOB' && tied *{ $_[0] } ) {
117         close $_[0] if defined ( fileno $_[0] );
118      }
119      $_[0] = \do { no warnings 'once'; local *FH };
120      $item = tie *{ $_[0] }, __PACKAGE__;
121   }
122
123   shift; _croak("Not enough arguments for open") unless @_;
124
125   if ( !defined wantarray ) {
126      $item->OPEN(@_) or _croak("open error: $!");
127   } else {
128      $item->OPEN(@_);
129   }
130}
131
132sub READ {
133   my ($fh, $len, $auto) = ($_[0], $_[2]);
134
135   if (lc(substr $len, -1, 1) eq 'm') {
136      $auto = 1, chop $len;  $len *= 1024 * 1024;
137   } elsif (lc(substr $len, -1, 1) eq 'k') {
138      $auto = 1, chop $len;  $len *= 1024;
139   }
140
141   # normal use-case
142
143   if (!$auto) {
144      return @_ == 4 ? read($fh, $_[1], $len, $_[3]) : read($fh, $_[1], $len);
145   }
146
147   # chunk IO, read up to record separator or eof
148   # support special case; e.g. $/ = "\n>" for bioinformatics
149   # anchoring ">" at the start of line
150
151   my ($tmp, $ret);
152
153   if (!eof($fh)) {
154      if (length $/ > 1 && substr($/, 0, 1) eq "\n") {
155         my $len = length($/) - 1;
156
157         if (tell $fh) {
158            $tmp = substr($/, 1);
159            $ret = read($fh, $tmp, $len, length($tmp));
160         } else {
161            $ret = read($fh, $tmp, $len);
162         }
163
164         if (defined $ret) {
165            $.   += 1 if eof($fh);
166            $tmp .= readline($fh);
167
168            substr($tmp, -$len, $len, '')
169               if (substr($tmp, -$len) eq substr($/, 1));
170         }
171      }
172      elsif (defined ($ret = CORE::read($fh, $tmp, $len))) {
173         $.   += 1 if eof($fh);
174         $tmp .= readline($fh);
175      }
176   }
177   else {
178      $tmp = '', $ret = 0;
179   }
180
181   if (defined $ret) {
182      my $pos = $_[3] || 0;
183      substr($_[1], $pos, length($_[1]) - $pos, $tmp);
184      length($tmp);
185   }
186   else {
187      undef;
188   }
189}
190
191sub READLINE {
192   # support special case; e.g. $/ = "\n>" for bioinformatics
193   # anchoring ">" at the start of line
194
195   if (length $/ > 1 && substr($/, 0, 1) eq "\n" && !eof($_[0])) {
196      my ($len, $buf) = (length($/) - 1);
197
198      if (tell $_[0]) {
199         $buf = substr($/, 1), $buf .= readline($_[0]);
200      } else {
201         $buf = readline($_[0]);
202      }
203
204      substr($buf, -$len, $len, '')
205         if (substr($buf, -$len) eq substr($/, 1));
206
207      $buf;
208   }
209   else {
210      scalar(readline($_[0]));
211   }
212}
213
214sub PRINT {
215   my $fh  = shift;
216   my $buf = join(defined $, ? $, : "", @_);
217   $buf   .= $\ if defined $\;
218   local $\; # don't print any line terminator
219   print $fh $buf;
220}
221
222sub PRINTF {
223   my $fh  = shift;
224   my $buf = sprintf(shift, @_);
225   local $\; # ditto
226   print $fh $buf;
227}
228
229sub WRITE {
230   use bytes;
231
232   # based on IO::SigGuard::syswrite 0.011 by Felipe Gasper (FELIPE)
233   my $wrote = 0;
234
235   WRITE: {
236      $wrote += (
237        ( @_ == 2 )
238          ? syswrite($_[0], $_[1], length($_[1]) - $wrote, $wrote)
239          : ( @_ == 3 )
240              ? syswrite($_[0], $_[1], $_[2] - $wrote, $wrote)
241              : syswrite($_[0], $_[1], $_[2] - $wrote, $_[3] + $wrote)
242      ) or do {
243         unless ( defined $wrote ) {
244            redo WRITE if $!{EINTR} || $!{EAGAIN} || $!{EWOULDBLOCK};
245            return undef;
246         }
247      };
248   }
249
250   $wrote;
251}
252
253{
254   no strict 'refs'; *{ __PACKAGE__.'::new' } = \&TIEHANDLE;
255}
256
257###############################################################################
258## ----------------------------------------------------------------------------
259## Server functions.
260##
261###############################################################################
262
263{
264   use constant {
265      SHR_O_CLO => 'O~CLO',  # Handle CLOSE
266      SHR_O_OPN => 'O~OPN',  # Handle OPEN
267      SHR_O_REA => 'O~REA',  # Handle READ
268      SHR_O_RLN => 'O~RLN',  # Handle READLINE
269      SHR_O_PRI => 'O~PRI',  # Handle PRINT
270      SHR_O_WRI => 'O~WRI',  # Handle WRITE
271   };
272
273   my (
274      $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_obj, $_freeze, $_thaw,
275      $_id, $_len, $_ret
276   );
277
278   my %_output_function = (
279
280      SHR_O_CLO.$LF => sub {                      # Handle CLOSE
281         $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
282         chomp($_id = <$_DAU_R_SOCK>);
283
284         close $_obj->{ $_id } if defined fileno($_obj->{ $_id });
285         print {$_DAU_R_SOCK} '1'.$LF;
286
287         return;
288      },
289
290      SHR_O_OPN.$LF => sub {                      # Handle OPEN
291         $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
292         my ($_fd, $_buf, $_err); local $!;
293
294         chomp($_id  = <$_DAU_R_SOCK>),
295         chomp($_fd  = <$_DAU_R_SOCK>),
296         chomp($_len = <$_DAU_R_SOCK>),
297
298         read($_DAU_R_SOCK, $_buf, $_len);
299         print {$_DAU_R_SOCK} $LF;
300
301         if ($_fd > $_max_fd) {
302            $_fd = IO::FDPass::recv(fileno $_DAU_R_SOCK); $_fd >= 0
303               or _croak("cannot receive file handle: $!");
304         }
305
306         close $_obj->{ $_id } if defined fileno($_obj->{ $_id });
307
308         my $_args = $_thaw->($_buf);
309         my $_fh;
310
311         if (@{ $_args } == 2) {
312            # remove tainted'ness from $_args
313            ($_args->[0]) = $_args->[0] =~ /(.*)/;
314            ($_args->[1]) = $_args->[1] =~ /(.*)/;
315
316            CORE::open($_fh, "$_args->[0]", $_args->[1]) or do { $_err = 0+$! };
317         }
318         else {
319            # remove tainted'ness from $_args
320            ($_args->[0]) = $_args->[0] =~ /(.*)/;
321
322            CORE::open($_fh, $_args->[0]) or do { $_err = 0+$! };
323         }
324
325         # enable autoflush
326         select(( select($_fh), $| = 1 )[0]) unless $_err;
327
328         *{ $_obj->{ $_id } } = *{ $_fh };
329         print {$_DAU_R_SOCK} $_err.$LF;
330
331         return;
332      },
333
334      SHR_O_REA.$LF => sub {                      # Handle READ
335         $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
336         my ($_a3, $_auto);
337
338         chomp($_id  = <$_DAU_R_SOCK>),
339         chomp($_a3  = <$_DAU_R_SOCK>),
340         chomp($_len = <$_DAU_R_SOCK>);
341
342         if (lc(substr $_a3, -1, 1) eq 'm') {
343            $_auto = 1, chop $_a3;  $_a3 *= 1024 * 1024;
344         } elsif (lc(substr $_a3, -1, 1) eq 'k') {
345            $_auto = 1, chop $_a3;  $_a3 *= 1024;
346         }
347
348         local $/; read($_DAU_R_SOCK, $/, $_len) if $_len;
349         my ($_fh, $_buf) = ($_obj->{ $_id }); local ($!, $.);
350
351         # support special case; e.g. $/ = "\n>" for bioinformatics
352         # anchoring ">" at the start of line
353
354         if (!$_auto) {
355            $. = 0, $_ret = read($_fh, $_buf, $_a3);
356         }
357         elsif (!eof($_fh)) {
358            if (length $/ > 1 && substr($/, 0, 1) eq "\n") {
359               $_len = length($/) - 1;
360
361               if (tell $_fh) {
362                  $_buf = substr($/, 1);
363                  $_ret = read($_fh, $_buf, $_a3, length($_buf));
364               } else {
365                  $_ret = read($_fh, $_buf, $_a3);
366               }
367
368               if (defined $_ret) {
369                  $.    += 1 if eof($_fh);
370                  $_buf .= readline($_fh);
371
372                  substr($_buf, -$_len, $_len, '')
373                     if (substr($_buf, -$_len) eq substr($/, 1));
374               }
375            }
376            elsif (defined ($_ret = read($_fh, $_buf, $_a3))) {
377               $.    += 1 if eof($_fh);
378               $_buf .= readline($_fh);
379            }
380         }
381         else {
382            $_buf = '', $_ret = 0;
383         }
384
385         if (defined $_ret) {
386            $_ret = length($_buf), $_buf = $_freeze->(\$_buf);
387            print {$_DAU_R_SOCK} "$.$LF" . length($_buf).$LF, $_buf, $_ret.$LF;
388         }
389         else {
390            print {$_DAU_R_SOCK} "$.$LF" . ( (0+$!) * -1 ).$LF;
391         }
392
393         return;
394      },
395
396      SHR_O_RLN.$LF => sub {                      # Handle READLINE
397         $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
398
399         chomp($_id  = <$_DAU_R_SOCK>),
400         chomp($_len = <$_DAU_R_SOCK>);
401
402         local $/; read($_DAU_R_SOCK, $/, $_len) if $_len;
403         my ($_fh, $_buf) = ($_obj->{ $_id }); local ($!, $.);
404
405         # support special case; e.g. $/ = "\n>" for bioinformatics
406         # anchoring ">" at the start of line
407
408         if (length $/ > 1 && substr($/, 0, 1) eq "\n" && !eof($_fh)) {
409            $_len = length($/) - 1;
410
411            if (tell $_fh) {
412               $_buf = substr($/, 1), $_buf .= readline($_fh);
413            } else {
414               $_buf = readline($_fh);
415            }
416
417            substr($_buf, -$_len, $_len, '')
418               if (substr($_buf, -$_len) eq substr($/, 1));
419         }
420         else {
421            $_buf = readline($_fh);
422         }
423
424         if (defined $_buf) {
425            $_buf = $_freeze->(\$_buf);
426            print {$_DAU_R_SOCK} "$.$LF" . length($_buf).$LF, $_buf;
427         } else {
428            print {$_DAU_R_SOCK} "$.$LF" . ( (0+$!) * -1 ).$LF;
429         }
430
431         return;
432      },
433
434      SHR_O_PRI.$LF => sub {                      # Handle PRINT
435         $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
436
437         chomp($_id  = <$_DAU_R_SOCK>),
438         chomp($_len = <$_DAU_R_SOCK>),
439
440         read($_DAU_R_SOCK, my($_buf), $_len);
441         print {$_obj->{ $_id }} ${ $_thaw->($_buf) };
442
443         return;
444      },
445
446      SHR_O_WRI.$LF => sub {                      # Handle WRITE
447         $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
448         use bytes;
449
450         chomp($_id  = <$_DAU_R_SOCK>),
451         chomp($_len = <$_DAU_R_SOCK>),
452
453         read($_DAU_R_SOCK, my($_buf), $_len);
454
455         my $_wrote = 0;
456
457         WRITE: {
458            $_wrote += ( syswrite (
459               $_obj->{ $_id }, $_buf, length($_buf) - $_wrote, $_wrote
460            )) or do {
461               unless ( defined $_wrote ) {
462                  redo WRITE if $!{EINTR} || $!{EAGAIN} || $!{EWOULDBLOCK};
463                  print {$_DAU_R_SOCK} ''.$LF;
464
465                  return;
466               }
467            };
468         }
469
470         print {$_DAU_R_SOCK} $_wrote.$LF;
471
472         return;
473      },
474
475   );
476
477   sub _init_mgr {
478      my $_function;
479      ( $_DAU_R_SOCK_REF, $_obj, $_function, $_freeze, $_thaw ) = @_;
480
481      for my $key ( keys %_output_function ) {
482         last if exists($_function->{$key});
483         $_function->{$key} = $_output_function{$key};
484      }
485
486      return;
487   }
488}
489
490###############################################################################
491## ----------------------------------------------------------------------------
492## Object package.
493##
494###############################################################################
495
496## Items below are folded into MCE::Shared::Object.
497
498package # hide from rpm
499   MCE::Shared::Object;
500
501use strict;
502use warnings;
503
504no warnings qw( threads recursion uninitialized numeric once );
505
506use bytes;
507
508no overloading;
509
510my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
511
512my ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj,
513    $_freeze, $_thaw);
514
515sub _init_handle {
516   ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj,
517    $_freeze, $_thaw) = @_;
518
519   return;
520}
521
522sub CLOSE {
523   _req1('O~CLO', $_[0]->[0].$LF);
524}
525
526sub OPEN {
527   my ($_id, $_fd, $_buf) = (shift()->[0]);
528   return unless defined $_[0];
529
530   if (ref $_[-1] && reftype($_[-1]) ne 'GLOB') {
531      _croak("open error: not a GLOB reference");
532   }
533   elsif (@_ == 1 && ref $_[0] && defined($_fd = fileno($_[0]))) {
534      $_buf = $_freeze->([ "<&=$_fd" ]);
535   }
536   elsif (@_ == 2 && ref $_[1] && defined($_fd = fileno($_[1]))) {
537      $_buf = $_freeze->([ $_[0]."&=$_fd" ]);
538   }
539   elsif (!ref $_[-1]) {
540      $_fd  = ($_[-1] =~ /&=(\d+)$/) ? $1 : -1;
541      $_buf = $_freeze->([ @_ ]);
542   }
543   else {
544      _croak("open error: unsupported use-case");
545   }
546
547   if ($_fd > $_max_fd && !$INC{'IO/FDPass.pm'}) {
548      _croak(
549         "\nSharing a handle object while the server is running\n",
550         "requires the IO::FDPass module.\n\n"
551      );
552   }
553
554   local $\ = undef if (defined $\);
555   local $/ = $LF if ($/ ne $LF);
556   local $MCE::Signal::SIG;
557
558   my $_err;
559
560   {
561      local $MCE::Signal::IPC = 1;
562      $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
563
564      print({$_DAT_W_SOCK} 'O~OPN'.$LF . $_chn.$LF),
565      print({$_DAU_W_SOCK} $_id.$LF . $_fd.$LF . length($_buf).$LF . $_buf);
566      <$_DAU_W_SOCK>;
567
568      IO::FDPass::send( fileno $_DAU_W_SOCK, fileno $_fd ) if ($_fd > $_max_fd);
569      chomp($_err = <$_DAU_W_SOCK>);
570
571      $_dat_un->() if !$_is_MSWin32;
572   }
573
574   CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
575
576   if ($_err) {
577      $! = $_err;
578      '';
579   } else {
580      $! = 0;
581      1;
582   }
583}
584
585sub READ {
586   local $\ = undef if (defined $\);
587   local $MCE::Signal::SIG;
588
589   my ($_len, $_ret);
590
591   {
592      local $MCE::Signal::IPC = 1;
593      $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
594
595      print({$_DAT_W_SOCK} 'O~REA'.$LF . $_chn.$LF),
596      print({$_DAU_W_SOCK} $_[0]->[0].$LF . $_[2].$LF . length($/).$LF . $/);
597
598      local $/ = $LF if ($/ ne $LF);
599      chomp($_ret = <$_DAU_W_SOCK>);
600      chomp($_len = <$_DAU_W_SOCK>);
601
602      if ($_len && $_len > 0) {
603         read($_DAU_W_SOCK, my $_buf, $_len);
604         chomp($_len = <$_DAU_W_SOCK>);
605
606         my $_ref = \$_[1];
607         if (defined $_[3]) {
608            no bytes;
609            substr($$_ref, $_[3], length($$_ref) - $_[3], '');
610            substr($$_ref, $_[3], $_len, ${ $_thaw->($_buf) });
611         }
612         else {
613            $$_ref = ${ $_thaw->($_buf) };
614         }
615      }
616
617      $_dat_un->() if !$_is_MSWin32;
618   }
619
620   CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
621
622   if ($_len) {
623      if ($_len < 0) {
624         $. = 0, $! = $_len * -1;
625         return undef;
626      }
627   }
628   else {
629      my $_ref = \$_[1];
630      if (defined $_[3]) {
631         no bytes;
632         substr($$_ref, $_[3], length($$_ref) - $_[3], '');
633      }
634      else {
635         $$_ref = '';
636      }
637   }
638
639   $. = $_ret, $! = 0;
640   $_len;
641}
642
643sub READLINE {
644   local $\ = undef if (defined $\);
645   local $MCE::Signal::SIG;
646
647   my ($_buf, $_len, $_ret);
648
649   {
650      local $MCE::Signal::IPC = 1;
651      $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
652
653      print({$_DAT_W_SOCK} 'O~RLN'.$LF . $_chn.$LF),
654      print({$_DAU_W_SOCK} $_[0]->[0].$LF . length($/).$LF . $/);
655
656      local $/ = $LF if ($/ ne $LF);
657      chomp($_ret = <$_DAU_W_SOCK>);
658      chomp($_len = <$_DAU_W_SOCK>);
659
660      if ($_len && $_len > 0) {
661         read($_DAU_W_SOCK, $_buf, $_len);
662      }
663
664      $_dat_un->() if !$_is_MSWin32;
665   }
666
667   CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
668
669   if ($_len && $_len < 0) {
670      $. = 0, $! = $_len * -1;
671      return undef;
672   }
673
674   $. = $_ret, $! = 0;
675   $_buf ? ${ $_thaw->($_buf) } : $_buf;
676}
677
678sub PRINT {
679   no bytes;
680   my $_id  = shift()->[0];
681   my $_buf = join(defined $, ? $, : "", @_);
682
683   $_buf .= $\ if defined $\;
684
685   if (length $_buf) {
686      $_buf = $_freeze->(\$_buf);
687      _req2('O~PRI', $_id.$LF . length($_buf).$LF, $_buf);
688   } else {
689      1;
690   }
691}
692
693sub PRINTF {
694   no bytes;
695   my $_id  = shift()->[0];
696   my $_buf = sprintf(shift, @_);
697
698   if (length $_buf) {
699      $_buf = $_freeze->(\$_buf);
700      _req2('O~PRI', $_id.$LF . length($_buf).$LF, $_buf);
701   } else {
702      1;
703   }
704}
705
706sub WRITE {
707   my $_id  = shift()->[0];
708
709   local $\ = undef if (defined $\);
710   local $/ = $LF if ($/ ne $LF);
711   local $MCE::Signal::SIG;
712
713   my $_ret;
714
715   {
716      local $MCE::Signal::IPC = 1;
717      $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
718
719      if (@_ == 1 || (@_ == 2 && $_[1] == length($_[0]))) {
720         print({$_DAT_W_SOCK} 'O~WRI'.$LF . $_chn.$LF),
721         print({$_DAU_W_SOCK} $_id.$LF . length($_[0]).$LF, $_[0]);
722      }
723      else {
724         my $_buf = substr($_[0], ($_[2] || 0), $_[1]);
725         print({$_DAT_W_SOCK} 'O~WRI'.$LF . $_chn.$LF),
726         print({$_DAU_W_SOCK} $_id.$LF . length($_buf).$LF, $_buf);
727      }
728
729      chomp($_ret = <$_DAU_W_SOCK>);
730
731      $_dat_un->() if !$_is_MSWin32;
732   }
733
734   CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
735
736   (length $_ret) ? $_ret : undef;
737}
738
7391;
740
741__END__
742
743###############################################################################
744## ----------------------------------------------------------------------------
745## Module usage.
746##
747###############################################################################
748
749=head1 NAME
750
751MCE::Shared::Handle - Handle helper class
752
753=head1 VERSION
754
755This document describes MCE::Shared::Handle version 1.874
756
757=head1 DESCRIPTION
758
759A handle helper class for use as a standalone or managed by L<MCE::Shared>.
760
761=head1 SYNOPSIS
762
763 # non-shared or local construction for use by a single process
764 # shorter, mce_open is an alias for MCE::Shared::Handle::open
765
766 use MCE::Shared::Handle;
767
768 MCE::Shared::Handle->open( my $fh, "<", "bio.fasta" )
769    or die "open error: $!";
770 MCE::Shared::Handle::open  my $fh, "<", "bio.fasta"
771    or die "open error: $!";
772
773 mce_open my $fh, "<", "bio.fasta" or die "open error: $!";
774
775 # construction for sharing with other threads and processes
776 # shorter, mce_open is an alias for MCE::Shared::open
777
778 use MCE::Shared;
779
780 MCE::Shared->open( my $fh, "<", "bio.fasta" )
781    or die "open error: $!";
782 MCE::Shared::open  my $fh, "<", "bio.fasta"
783    or die "open error: $!";
784
785 mce_open my $fh, "<", "bio.fasta" or die "open error: $!";
786
787 # example, output is serialized, not garbled
788
789 use MCE::Hobo;
790 use MCE::Shared;
791
792 mce_open my $ofh, ">>", \*STDOUT  or die "open error: $!";
793 mce_open my $ifh, "<", "file.log" or die "open error: $!";
794
795 sub parallel {
796    $/ = "\n"; # can set the input record separator
797    while (my $line = <$ifh>) {
798       printf {$ofh} "[%5d] %s", $., $line;
799    }
800 }
801
802 MCE::Hobo->create( \&parallel ) for 1 .. 4;
803
804 $_->join() for MCE::Hobo->list();
805
806 # handle functions
807
808 my $bool = eof($ifh);
809 my $off  = tell($ifh);
810 my $fd   = fileno($ifh);
811 my $char = getc($ifh);
812 my $line = readline($ifh);
813
814 binmode $ifh;
815 seek $ifh, 10, 0;
816 read $ifh, my($buf), 80;
817
818 print  {$ofh} "foo\n";
819 printf {$ofh} "%s\n", "bar";
820
821 open $ofh, ">>", \*STDERR;
822 syswrite $ofh, "shared handle to STDERR\n";
823
824 close $ifh;
825 close $ofh;
826
827=head1 API DOCUMENTATION
828
829=head2 MCE::Shared::Handle->new ( )
830
831Called by MCE::Shared for constructing a shared-handle object.
832
833=head2 open ( filehandle, expr )
834
835=head2 open ( filehandle, mode, expr )
836
837=head2 open ( filehandle, mode, reference )
838
839In version 1.007 and later, constructs a new object by opening the file
840whose filename is given by C<expr>, and associates it with C<filehandle>.
841When omitting error checking at the application level, MCE::Shared emits
842a message and stop if open fails.
843
844 # non-shared or local construction for use by a single process
845
846 use MCE::Shared::Handle;
847
848 MCE::Shared::Handle->open( my $fh, "<", "file.log" ) or die "$!";
849 MCE::Shared::Handle::open  my $fh, "<", "file.log"   or die "$!";
850
851 mce_open my $fh, "<", "file.log" or die "$!"; # ditto
852
853 # construction for sharing with other threads and processes
854
855 use MCE::Shared;
856
857 MCE::Shared->open( my $fh, "<", "file.log" ) or die "$!";
858 MCE::Shared::open  my $fh, "<", "file.log"   or die "$!";
859
860 mce_open my $fh, "<", "file.log" or die "$!"; # ditto
861
862=head2 mce_open ( filehandle, expr )
863
864=head2 mce_open ( filehandle, mode, expr )
865
866=head2 mce_open ( filehandle, mode, reference )
867
868Native Perl-like syntax to open a file for reading:
869
870 # mce_open is exported by MCE::Shared or MCE::Shared::Handle.
871 # It creates a shared file handle with MCE::Shared present
872 # or a non-shared handle otherwise.
873
874 mce_open my $fh, "< input.txt"     or die "open error: $!";
875 mce_open my $fh, "<", "input.txt"  or die "open error: $!";
876 mce_open my $fh, "<", \*STDIN      or die "open error: $!";
877
878and for writing:
879
880 mce_open my $fh, "> output.txt"    or die "open error: $!";
881 mce_open my $fh, ">", "output.txt" or die "open error: $!";
882 mce_open my $fh, ">", \*STDOUT     or die "open error: $!";
883
884=head1 CHUNK IO
885
886Starting with C<MCE::Shared> v1.007, chunk IO is possible for both non-shared
887and shared handles. Chunk IO is enabled by the trailing 'k' or 'm' for read
888size. Also, chunk IO supports the special "\n>"-like record separator.
889That anchors ">" at the start of the line. Workers receive record(s) beginning
890with ">" and ending with "\n".
891
892 # non-shared handle ---------------------------------------------
893
894 use MCE::Shared::Handle;
895
896 mce_open my $fh, '<', 'bio.fasta' or die "open error: $!";
897
898 # shared handle -------------------------------------------------
899
900 use MCE::Shared;
901
902 mce_open my $fh, '<', 'bio.fasta' or die "open error: $!";
903
904 # 'k' or 'm' indicates kibiBytes (KiB) or mebiBytes (MiB) respectively.
905 # Read continues reading until reaching the record separator or EOF.
906 # Optionally, one may specify the record separator.
907
908 $/ = "\n>";
909
910 while ( read($fh, my($buf), '2k') ) {
911    print "# chunk number: $.\n";
912    print "$buf\n";
913 }
914
915C<$.> contains the chunk_id above or the record_number below. C<readline($fh)>
916or C<$fh> may be used for reading a single record.
917
918 while ( my $buf = <$fh> ) {
919    print "# record number: $.\n";
920    print "$buf\n";
921 }
922
923The following provides a parallel demonstration. Workers receive the next chunk
924from the shared-manager process where the actual read takes place. MCE::Shared
925also works with C<threads>, C<forks>, and likely other parallel modules.
926
927 use MCE::Hobo;       # (change to) use threads; (or) use forks;
928 use MCE::Shared;
929 use feature qw( say );
930
931 my $pattern  = 'something';
932 my $hugefile = 'somehuge.log';
933
934 my $result = MCE::Shared->array();
935 mce_open my $fh, "<", $hugefile or die "open error: $!";
936
937 sub task {
938    # the trailing 'k' or 'm' for size enables chunk IO
939    while ( read $fh, my( $slurp_chunk ), "640k" ) {
940       my $chunk_id = $.;
941       # process chunk only if a match is found; ie. fast scan
942       # optionally, comment out the if statement and closing brace
943       if ( $slurp_chunk =~ /$pattern/m ) {
944          my @matches;
945          while ( $slurp_chunk =~ /([^\n]+\n)/mg ) {
946             my $line = $1; # save $1 to not lose the value
947             push @matches, $line if ( $line =~ /$pattern/ );
948          }
949          $result->push( @matches ) if @matches;
950       }
951    }
952 }
953
954 MCE::Hobo->create('task') for 1 .. 4;
955
956 # do something else
957
958 MCE::Hobo->waitall();
959
960 say $result->len();
961
962For comparison, the same thing using C<MCE::Flow>. MCE workers read the file
963directly when given a plain path, so will have lesser overhead. However, the
964run time is similar if one were to pass a file handle instead to mce_flow_f.
965
966The benefit of chunk IO is from lesser IPC for the shared-manager process
967(above). Likewise, for the mce-manager process (below).
968
969 use MCE::Flow;
970 use feature qw( say );
971
972 my $pattern  = 'something';
973 my $hugefile = 'somehuge.log';
974
975 my @result = mce_flow_f {
976    max_workers => 4, chunk_size => '640k',
977    use_slurpio => 1,
978 },
979 sub {
980    my ( $mce, $slurp_ref, $chunk_id ) = @_;
981    # process chunk only if a match is found; ie. fast scan
982    # optionally, comment out the if statement and closing brace
983    if ( $$slurp_ref =~ /$pattern/m ) {
984       my @matches;
985       while ( $$slurp_ref =~ /([^\n]+\n)/mg ) {
986          my $line = $1; # save $1 to not lose the value
987          push @matches, $line if ( $line =~ /$pattern/ );
988       }
989       MCE->gather( @matches ) if @matches;
990    }
991 }, $hugefile;
992
993 say scalar( @result );
994
995=head1 CREDITS
996
997Implementation inspired by L<Tie::StdHandle>.
998
999=head1 LIMITATIONS
1000
1001Perl must have L<IO::FDPass> for constructing a shared C<condvar> or C<queue>
1002while the shared-manager process is running. For platforms where L<IO::FDPass>
1003isn't possible, construct C<condvar> and C<queue> before other classes.
1004On systems without C<IO::FDPass>, the manager process is delayed until sharing
1005other classes or started explicitly.
1006
1007 use MCE::Shared;
1008
1009 my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0;
1010
1011 my $cv  = MCE::Shared->condvar();
1012 my $que = MCE::Shared->queue();
1013
1014 MCE::Shared->start() unless $has_IO_FDPass;
1015
1016Regarding mce_open, C<IO::FDPass> is needed for constructing a shared-handle
1017from a non-shared handle not yet available inside the shared-manager process.
1018The workaround is to have the non-shared handle made before the shared-manager
1019is started. Passing a file by reference is fine for the three STD* handles.
1020
1021 # The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR.
1022
1023 mce_open my $shared_in,  "<",  \*STDIN;   # ok
1024 mce_open my $shared_out, ">>", \*STDOUT;  # ok
1025 mce_open my $shared_err, ">>", \*STDERR;  # ok
1026 mce_open my $shared_fh1, "<",  "/path/to/sequence.fasta";  # ok
1027 mce_open my $shared_fh2, ">>", "/path/to/results.log";     # ok
1028
1029 mce_open my $shared_fh, ">>", \*NON_SHARED_FH;  # requires IO::FDPass
1030
1031The L<IO::FDPass> module is known to work reliably on most platforms.
1032Install 1.1 or later to rid of limitations described above.
1033
1034 perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'"
1035
1036=head1 INDEX
1037
1038L<MCE|MCE>, L<MCE::Hobo>, L<MCE::Shared>
1039
1040=head1 AUTHOR
1041
1042Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1043
1044=cut
1045
1046