1############################################################################### 2## ---------------------------------------------------------------------------- 3## Handle helper class. 4## 5############################################################################### 6 7package MCE::Shared::Handle; 8 9use strict; 10use warnings; 11 12use 5.010001; 13 14no warnings qw( threads recursion uninitialized numeric ); 15 16our $VERSION = '1.874'; 17 18## no critic (BuiltinFunctions::ProhibitStringyEval) 19## no critic (InputOutput::ProhibitTwoArgOpen) 20## no critic (Subroutines::ProhibitExplicitReturnUndef) 21## no critic (Subroutines::ProhibitSubroutinePrototypes) 22## no critic (TestingAndDebugging::ProhibitNoStrict) 23 24use MCE::Shared::Base (); 25 26my $LF = "\012"; Internals::SvREADONLY($LF, 1); 27my $_tid = $INC{'threads.pm'} ? threads->tid() : 0; 28my $_max_fd = eval 'fileno(\*main::DATA)' // 2; 29my $_reset_flg = 1; 30 31sub _croak { 32 goto &MCE::Shared::Base::_croak; 33} 34sub CLONE { 35 $_tid = threads->tid() if $INC{'threads.pm'}; 36} 37 38sub import { 39 if (!defined $INC{'MCE/Shared.pm'}) { 40 no strict 'refs'; no warnings 'redefine'; 41 *{ caller().'::mce_open' } = \&open; 42 } 43 return; 44} 45 46sub TIEHANDLE { 47 my $class = shift; 48 49 if (ref $_[0] eq 'ARRAY') { 50 # For use with MCE::Shared in order to reach the Server process. 51 # Therefore constructed without a GLOB handle initially. 52 53 MCE::Shared::Object::_reset(), $_reset_flg = '' 54 if $_reset_flg && $INC{'MCE/Shared/Server.pm'}; 55 56 return bless $_[0], $class; 57 } 58 59 bless my $fh = \do { no warnings 'once'; local *FH }, $class; 60 61 if (@_) { 62 if ( !defined wantarray ) { 63 $fh->OPEN(@_) or _croak("open error: $!"); 64 } else { 65 $fh->OPEN(@_) or return ''; 66 } 67 } 68 69 $fh; 70} 71 72############################################################################### 73## ---------------------------------------------------------------------------- 74## Based on Tie::StdHandle. 75## 76############################################################################### 77 78sub EOF { eof($_[0]) } 79sub TELL { tell($_[0]) } 80sub FILENO { fileno($_[0]) } 81sub SEEK { seek($_[0], $_[1], $_[2]) } 82sub CLOSE { close($_[0]) if defined(fileno $_[0]) } 83sub BINMODE { binmode($_[0], $_[1] // ':raw') ? 1 : '' } 84sub GETC { getc($_[0]) } 85 86sub OPEN { 87 my $ret; 88 89 close($_[0]) if defined fileno($_[0]); 90 91 if ( @_ == 3 && ref $_[2] && defined( my $_fd = fileno($_[2]) ) ) { 92 $ret = CORE::open($_[0], $_[1]."&=$_fd"); 93 } 94 else { 95 $ret = ( @_ == 2 ) 96 ? CORE::open($_[0], $_[1]) 97 : CORE::open($_[0], $_[1], $_[2]); 98 } 99 100 # enable autoflush 101 select(( select($_[0]), $| = 1 )[0]) if $ret; 102 103 $ret; 104} 105 106sub open (@) { 107 shift if ( defined $_[0] && $_[0] eq 'MCE::Shared::Handle' ); 108 109 my $item; 110 111 if ( ref $_[0] eq 'GLOB' && tied *{ $_[0] } && 112 ref tied(*{ $_[0] }) eq __PACKAGE__ ) { 113 $item = tied *{ $_[0] }; 114 } 115 elsif ( @_ ) { 116 if ( ref $_[0] eq 'GLOB' && tied *{ $_[0] } ) { 117 close $_[0] if defined ( fileno $_[0] ); 118 } 119 $_[0] = \do { no warnings 'once'; local *FH }; 120 $item = tie *{ $_[0] }, __PACKAGE__; 121 } 122 123 shift; _croak("Not enough arguments for open") unless @_; 124 125 if ( !defined wantarray ) { 126 $item->OPEN(@_) or _croak("open error: $!"); 127 } else { 128 $item->OPEN(@_); 129 } 130} 131 132sub READ { 133 my ($fh, $len, $auto) = ($_[0], $_[2]); 134 135 if (lc(substr $len, -1, 1) eq 'm') { 136 $auto = 1, chop $len; $len *= 1024 * 1024; 137 } elsif (lc(substr $len, -1, 1) eq 'k') { 138 $auto = 1, chop $len; $len *= 1024; 139 } 140 141 # normal use-case 142 143 if (!$auto) { 144 return @_ == 4 ? read($fh, $_[1], $len, $_[3]) : read($fh, $_[1], $len); 145 } 146 147 # chunk IO, read up to record separator or eof 148 # support special case; e.g. $/ = "\n>" for bioinformatics 149 # anchoring ">" at the start of line 150 151 my ($tmp, $ret); 152 153 if (!eof($fh)) { 154 if (length $/ > 1 && substr($/, 0, 1) eq "\n") { 155 my $len = length($/) - 1; 156 157 if (tell $fh) { 158 $tmp = substr($/, 1); 159 $ret = read($fh, $tmp, $len, length($tmp)); 160 } else { 161 $ret = read($fh, $tmp, $len); 162 } 163 164 if (defined $ret) { 165 $. += 1 if eof($fh); 166 $tmp .= readline($fh); 167 168 substr($tmp, -$len, $len, '') 169 if (substr($tmp, -$len) eq substr($/, 1)); 170 } 171 } 172 elsif (defined ($ret = CORE::read($fh, $tmp, $len))) { 173 $. += 1 if eof($fh); 174 $tmp .= readline($fh); 175 } 176 } 177 else { 178 $tmp = '', $ret = 0; 179 } 180 181 if (defined $ret) { 182 my $pos = $_[3] || 0; 183 substr($_[1], $pos, length($_[1]) - $pos, $tmp); 184 length($tmp); 185 } 186 else { 187 undef; 188 } 189} 190 191sub READLINE { 192 # support special case; e.g. $/ = "\n>" for bioinformatics 193 # anchoring ">" at the start of line 194 195 if (length $/ > 1 && substr($/, 0, 1) eq "\n" && !eof($_[0])) { 196 my ($len, $buf) = (length($/) - 1); 197 198 if (tell $_[0]) { 199 $buf = substr($/, 1), $buf .= readline($_[0]); 200 } else { 201 $buf = readline($_[0]); 202 } 203 204 substr($buf, -$len, $len, '') 205 if (substr($buf, -$len) eq substr($/, 1)); 206 207 $buf; 208 } 209 else { 210 scalar(readline($_[0])); 211 } 212} 213 214sub PRINT { 215 my $fh = shift; 216 my $buf = join(defined $, ? $, : "", @_); 217 $buf .= $\ if defined $\; 218 local $\; # don't print any line terminator 219 print $fh $buf; 220} 221 222sub PRINTF { 223 my $fh = shift; 224 my $buf = sprintf(shift, @_); 225 local $\; # ditto 226 print $fh $buf; 227} 228 229sub WRITE { 230 use bytes; 231 232 # based on IO::SigGuard::syswrite 0.011 by Felipe Gasper (FELIPE) 233 my $wrote = 0; 234 235 WRITE: { 236 $wrote += ( 237 ( @_ == 2 ) 238 ? syswrite($_[0], $_[1], length($_[1]) - $wrote, $wrote) 239 : ( @_ == 3 ) 240 ? syswrite($_[0], $_[1], $_[2] - $wrote, $wrote) 241 : syswrite($_[0], $_[1], $_[2] - $wrote, $_[3] + $wrote) 242 ) or do { 243 unless ( defined $wrote ) { 244 redo WRITE if $!{EINTR} || $!{EAGAIN} || $!{EWOULDBLOCK}; 245 return undef; 246 } 247 }; 248 } 249 250 $wrote; 251} 252 253{ 254 no strict 'refs'; *{ __PACKAGE__.'::new' } = \&TIEHANDLE; 255} 256 257############################################################################### 258## ---------------------------------------------------------------------------- 259## Server functions. 260## 261############################################################################### 262 263{ 264 use constant { 265 SHR_O_CLO => 'O~CLO', # Handle CLOSE 266 SHR_O_OPN => 'O~OPN', # Handle OPEN 267 SHR_O_REA => 'O~REA', # Handle READ 268 SHR_O_RLN => 'O~RLN', # Handle READLINE 269 SHR_O_PRI => 'O~PRI', # Handle PRINT 270 SHR_O_WRI => 'O~WRI', # Handle WRITE 271 }; 272 273 my ( 274 $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_obj, $_freeze, $_thaw, 275 $_id, $_len, $_ret 276 ); 277 278 my %_output_function = ( 279 280 SHR_O_CLO.$LF => sub { # Handle CLOSE 281 $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF }; 282 chomp($_id = <$_DAU_R_SOCK>); 283 284 close $_obj->{ $_id } if defined fileno($_obj->{ $_id }); 285 print {$_DAU_R_SOCK} '1'.$LF; 286 287 return; 288 }, 289 290 SHR_O_OPN.$LF => sub { # Handle OPEN 291 $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF }; 292 my ($_fd, $_buf, $_err); local $!; 293 294 chomp($_id = <$_DAU_R_SOCK>), 295 chomp($_fd = <$_DAU_R_SOCK>), 296 chomp($_len = <$_DAU_R_SOCK>), 297 298 read($_DAU_R_SOCK, $_buf, $_len); 299 print {$_DAU_R_SOCK} $LF; 300 301 if ($_fd > $_max_fd) { 302 $_fd = IO::FDPass::recv(fileno $_DAU_R_SOCK); $_fd >= 0 303 or _croak("cannot receive file handle: $!"); 304 } 305 306 close $_obj->{ $_id } if defined fileno($_obj->{ $_id }); 307 308 my $_args = $_thaw->($_buf); 309 my $_fh; 310 311 if (@{ $_args } == 2) { 312 # remove tainted'ness from $_args 313 ($_args->[0]) = $_args->[0] =~ /(.*)/; 314 ($_args->[1]) = $_args->[1] =~ /(.*)/; 315 316 CORE::open($_fh, "$_args->[0]", $_args->[1]) or do { $_err = 0+$! }; 317 } 318 else { 319 # remove tainted'ness from $_args 320 ($_args->[0]) = $_args->[0] =~ /(.*)/; 321 322 CORE::open($_fh, $_args->[0]) or do { $_err = 0+$! }; 323 } 324 325 # enable autoflush 326 select(( select($_fh), $| = 1 )[0]) unless $_err; 327 328 *{ $_obj->{ $_id } } = *{ $_fh }; 329 print {$_DAU_R_SOCK} $_err.$LF; 330 331 return; 332 }, 333 334 SHR_O_REA.$LF => sub { # Handle READ 335 $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF }; 336 my ($_a3, $_auto); 337 338 chomp($_id = <$_DAU_R_SOCK>), 339 chomp($_a3 = <$_DAU_R_SOCK>), 340 chomp($_len = <$_DAU_R_SOCK>); 341 342 if (lc(substr $_a3, -1, 1) eq 'm') { 343 $_auto = 1, chop $_a3; $_a3 *= 1024 * 1024; 344 } elsif (lc(substr $_a3, -1, 1) eq 'k') { 345 $_auto = 1, chop $_a3; $_a3 *= 1024; 346 } 347 348 local $/; read($_DAU_R_SOCK, $/, $_len) if $_len; 349 my ($_fh, $_buf) = ($_obj->{ $_id }); local ($!, $.); 350 351 # support special case; e.g. $/ = "\n>" for bioinformatics 352 # anchoring ">" at the start of line 353 354 if (!$_auto) { 355 $. = 0, $_ret = read($_fh, $_buf, $_a3); 356 } 357 elsif (!eof($_fh)) { 358 if (length $/ > 1 && substr($/, 0, 1) eq "\n") { 359 $_len = length($/) - 1; 360 361 if (tell $_fh) { 362 $_buf = substr($/, 1); 363 $_ret = read($_fh, $_buf, $_a3, length($_buf)); 364 } else { 365 $_ret = read($_fh, $_buf, $_a3); 366 } 367 368 if (defined $_ret) { 369 $. += 1 if eof($_fh); 370 $_buf .= readline($_fh); 371 372 substr($_buf, -$_len, $_len, '') 373 if (substr($_buf, -$_len) eq substr($/, 1)); 374 } 375 } 376 elsif (defined ($_ret = read($_fh, $_buf, $_a3))) { 377 $. += 1 if eof($_fh); 378 $_buf .= readline($_fh); 379 } 380 } 381 else { 382 $_buf = '', $_ret = 0; 383 } 384 385 if (defined $_ret) { 386 $_ret = length($_buf), $_buf = $_freeze->(\$_buf); 387 print {$_DAU_R_SOCK} "$.$LF" . length($_buf).$LF, $_buf, $_ret.$LF; 388 } 389 else { 390 print {$_DAU_R_SOCK} "$.$LF" . ( (0+$!) * -1 ).$LF; 391 } 392 393 return; 394 }, 395 396 SHR_O_RLN.$LF => sub { # Handle READLINE 397 $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF }; 398 399 chomp($_id = <$_DAU_R_SOCK>), 400 chomp($_len = <$_DAU_R_SOCK>); 401 402 local $/; read($_DAU_R_SOCK, $/, $_len) if $_len; 403 my ($_fh, $_buf) = ($_obj->{ $_id }); local ($!, $.); 404 405 # support special case; e.g. $/ = "\n>" for bioinformatics 406 # anchoring ">" at the start of line 407 408 if (length $/ > 1 && substr($/, 0, 1) eq "\n" && !eof($_fh)) { 409 $_len = length($/) - 1; 410 411 if (tell $_fh) { 412 $_buf = substr($/, 1), $_buf .= readline($_fh); 413 } else { 414 $_buf = readline($_fh); 415 } 416 417 substr($_buf, -$_len, $_len, '') 418 if (substr($_buf, -$_len) eq substr($/, 1)); 419 } 420 else { 421 $_buf = readline($_fh); 422 } 423 424 if (defined $_buf) { 425 $_buf = $_freeze->(\$_buf); 426 print {$_DAU_R_SOCK} "$.$LF" . length($_buf).$LF, $_buf; 427 } else { 428 print {$_DAU_R_SOCK} "$.$LF" . ( (0+$!) * -1 ).$LF; 429 } 430 431 return; 432 }, 433 434 SHR_O_PRI.$LF => sub { # Handle PRINT 435 $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF }; 436 437 chomp($_id = <$_DAU_R_SOCK>), 438 chomp($_len = <$_DAU_R_SOCK>), 439 440 read($_DAU_R_SOCK, my($_buf), $_len); 441 print {$_obj->{ $_id }} ${ $_thaw->($_buf) }; 442 443 return; 444 }, 445 446 SHR_O_WRI.$LF => sub { # Handle WRITE 447 $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF }; 448 use bytes; 449 450 chomp($_id = <$_DAU_R_SOCK>), 451 chomp($_len = <$_DAU_R_SOCK>), 452 453 read($_DAU_R_SOCK, my($_buf), $_len); 454 455 my $_wrote = 0; 456 457 WRITE: { 458 $_wrote += ( syswrite ( 459 $_obj->{ $_id }, $_buf, length($_buf) - $_wrote, $_wrote 460 )) or do { 461 unless ( defined $_wrote ) { 462 redo WRITE if $!{EINTR} || $!{EAGAIN} || $!{EWOULDBLOCK}; 463 print {$_DAU_R_SOCK} ''.$LF; 464 465 return; 466 } 467 }; 468 } 469 470 print {$_DAU_R_SOCK} $_wrote.$LF; 471 472 return; 473 }, 474 475 ); 476 477 sub _init_mgr { 478 my $_function; 479 ( $_DAU_R_SOCK_REF, $_obj, $_function, $_freeze, $_thaw ) = @_; 480 481 for my $key ( keys %_output_function ) { 482 last if exists($_function->{$key}); 483 $_function->{$key} = $_output_function{$key}; 484 } 485 486 return; 487 } 488} 489 490############################################################################### 491## ---------------------------------------------------------------------------- 492## Object package. 493## 494############################################################################### 495 496## Items below are folded into MCE::Shared::Object. 497 498package # hide from rpm 499 MCE::Shared::Object; 500 501use strict; 502use warnings; 503 504no warnings qw( threads recursion uninitialized numeric once ); 505 506use bytes; 507 508no overloading; 509 510my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0; 511 512my ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj, 513 $_freeze, $_thaw); 514 515sub _init_handle { 516 ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj, 517 $_freeze, $_thaw) = @_; 518 519 return; 520} 521 522sub CLOSE { 523 _req1('O~CLO', $_[0]->[0].$LF); 524} 525 526sub OPEN { 527 my ($_id, $_fd, $_buf) = (shift()->[0]); 528 return unless defined $_[0]; 529 530 if (ref $_[-1] && reftype($_[-1]) ne 'GLOB') { 531 _croak("open error: not a GLOB reference"); 532 } 533 elsif (@_ == 1 && ref $_[0] && defined($_fd = fileno($_[0]))) { 534 $_buf = $_freeze->([ "<&=$_fd" ]); 535 } 536 elsif (@_ == 2 && ref $_[1] && defined($_fd = fileno($_[1]))) { 537 $_buf = $_freeze->([ $_[0]."&=$_fd" ]); 538 } 539 elsif (!ref $_[-1]) { 540 $_fd = ($_[-1] =~ /&=(\d+)$/) ? $1 : -1; 541 $_buf = $_freeze->([ @_ ]); 542 } 543 else { 544 _croak("open error: unsupported use-case"); 545 } 546 547 if ($_fd > $_max_fd && !$INC{'IO/FDPass.pm'}) { 548 _croak( 549 "\nSharing a handle object while the server is running\n", 550 "requires the IO::FDPass module.\n\n" 551 ); 552 } 553 554 local $\ = undef if (defined $\); 555 local $/ = $LF if ($/ ne $LF); 556 local $MCE::Signal::SIG; 557 558 my $_err; 559 560 { 561 local $MCE::Signal::IPC = 1; 562 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->(); 563 564 print({$_DAT_W_SOCK} 'O~OPN'.$LF . $_chn.$LF), 565 print({$_DAU_W_SOCK} $_id.$LF . $_fd.$LF . length($_buf).$LF . $_buf); 566 <$_DAU_W_SOCK>; 567 568 IO::FDPass::send( fileno $_DAU_W_SOCK, fileno $_fd ) if ($_fd > $_max_fd); 569 chomp($_err = <$_DAU_W_SOCK>); 570 571 $_dat_un->() if !$_is_MSWin32; 572 } 573 574 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG; 575 576 if ($_err) { 577 $! = $_err; 578 ''; 579 } else { 580 $! = 0; 581 1; 582 } 583} 584 585sub READ { 586 local $\ = undef if (defined $\); 587 local $MCE::Signal::SIG; 588 589 my ($_len, $_ret); 590 591 { 592 local $MCE::Signal::IPC = 1; 593 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->(); 594 595 print({$_DAT_W_SOCK} 'O~REA'.$LF . $_chn.$LF), 596 print({$_DAU_W_SOCK} $_[0]->[0].$LF . $_[2].$LF . length($/).$LF . $/); 597 598 local $/ = $LF if ($/ ne $LF); 599 chomp($_ret = <$_DAU_W_SOCK>); 600 chomp($_len = <$_DAU_W_SOCK>); 601 602 if ($_len && $_len > 0) { 603 read($_DAU_W_SOCK, my $_buf, $_len); 604 chomp($_len = <$_DAU_W_SOCK>); 605 606 my $_ref = \$_[1]; 607 if (defined $_[3]) { 608 no bytes; 609 substr($$_ref, $_[3], length($$_ref) - $_[3], ''); 610 substr($$_ref, $_[3], $_len, ${ $_thaw->($_buf) }); 611 } 612 else { 613 $$_ref = ${ $_thaw->($_buf) }; 614 } 615 } 616 617 $_dat_un->() if !$_is_MSWin32; 618 } 619 620 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG; 621 622 if ($_len) { 623 if ($_len < 0) { 624 $. = 0, $! = $_len * -1; 625 return undef; 626 } 627 } 628 else { 629 my $_ref = \$_[1]; 630 if (defined $_[3]) { 631 no bytes; 632 substr($$_ref, $_[3], length($$_ref) - $_[3], ''); 633 } 634 else { 635 $$_ref = ''; 636 } 637 } 638 639 $. = $_ret, $! = 0; 640 $_len; 641} 642 643sub READLINE { 644 local $\ = undef if (defined $\); 645 local $MCE::Signal::SIG; 646 647 my ($_buf, $_len, $_ret); 648 649 { 650 local $MCE::Signal::IPC = 1; 651 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->(); 652 653 print({$_DAT_W_SOCK} 'O~RLN'.$LF . $_chn.$LF), 654 print({$_DAU_W_SOCK} $_[0]->[0].$LF . length($/).$LF . $/); 655 656 local $/ = $LF if ($/ ne $LF); 657 chomp($_ret = <$_DAU_W_SOCK>); 658 chomp($_len = <$_DAU_W_SOCK>); 659 660 if ($_len && $_len > 0) { 661 read($_DAU_W_SOCK, $_buf, $_len); 662 } 663 664 $_dat_un->() if !$_is_MSWin32; 665 } 666 667 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG; 668 669 if ($_len && $_len < 0) { 670 $. = 0, $! = $_len * -1; 671 return undef; 672 } 673 674 $. = $_ret, $! = 0; 675 $_buf ? ${ $_thaw->($_buf) } : $_buf; 676} 677 678sub PRINT { 679 no bytes; 680 my $_id = shift()->[0]; 681 my $_buf = join(defined $, ? $, : "", @_); 682 683 $_buf .= $\ if defined $\; 684 685 if (length $_buf) { 686 $_buf = $_freeze->(\$_buf); 687 _req2('O~PRI', $_id.$LF . length($_buf).$LF, $_buf); 688 } else { 689 1; 690 } 691} 692 693sub PRINTF { 694 no bytes; 695 my $_id = shift()->[0]; 696 my $_buf = sprintf(shift, @_); 697 698 if (length $_buf) { 699 $_buf = $_freeze->(\$_buf); 700 _req2('O~PRI', $_id.$LF . length($_buf).$LF, $_buf); 701 } else { 702 1; 703 } 704} 705 706sub WRITE { 707 my $_id = shift()->[0]; 708 709 local $\ = undef if (defined $\); 710 local $/ = $LF if ($/ ne $LF); 711 local $MCE::Signal::SIG; 712 713 my $_ret; 714 715 { 716 local $MCE::Signal::IPC = 1; 717 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->(); 718 719 if (@_ == 1 || (@_ == 2 && $_[1] == length($_[0]))) { 720 print({$_DAT_W_SOCK} 'O~WRI'.$LF . $_chn.$LF), 721 print({$_DAU_W_SOCK} $_id.$LF . length($_[0]).$LF, $_[0]); 722 } 723 else { 724 my $_buf = substr($_[0], ($_[2] || 0), $_[1]); 725 print({$_DAT_W_SOCK} 'O~WRI'.$LF . $_chn.$LF), 726 print({$_DAU_W_SOCK} $_id.$LF . length($_buf).$LF, $_buf); 727 } 728 729 chomp($_ret = <$_DAU_W_SOCK>); 730 731 $_dat_un->() if !$_is_MSWin32; 732 } 733 734 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG; 735 736 (length $_ret) ? $_ret : undef; 737} 738 7391; 740 741__END__ 742 743############################################################################### 744## ---------------------------------------------------------------------------- 745## Module usage. 746## 747############################################################################### 748 749=head1 NAME 750 751MCE::Shared::Handle - Handle helper class 752 753=head1 VERSION 754 755This document describes MCE::Shared::Handle version 1.874 756 757=head1 DESCRIPTION 758 759A handle helper class for use as a standalone or managed by L<MCE::Shared>. 760 761=head1 SYNOPSIS 762 763 # non-shared or local construction for use by a single process 764 # shorter, mce_open is an alias for MCE::Shared::Handle::open 765 766 use MCE::Shared::Handle; 767 768 MCE::Shared::Handle->open( my $fh, "<", "bio.fasta" ) 769 or die "open error: $!"; 770 MCE::Shared::Handle::open my $fh, "<", "bio.fasta" 771 or die "open error: $!"; 772 773 mce_open my $fh, "<", "bio.fasta" or die "open error: $!"; 774 775 # construction for sharing with other threads and processes 776 # shorter, mce_open is an alias for MCE::Shared::open 777 778 use MCE::Shared; 779 780 MCE::Shared->open( my $fh, "<", "bio.fasta" ) 781 or die "open error: $!"; 782 MCE::Shared::open my $fh, "<", "bio.fasta" 783 or die "open error: $!"; 784 785 mce_open my $fh, "<", "bio.fasta" or die "open error: $!"; 786 787 # example, output is serialized, not garbled 788 789 use MCE::Hobo; 790 use MCE::Shared; 791 792 mce_open my $ofh, ">>", \*STDOUT or die "open error: $!"; 793 mce_open my $ifh, "<", "file.log" or die "open error: $!"; 794 795 sub parallel { 796 $/ = "\n"; # can set the input record separator 797 while (my $line = <$ifh>) { 798 printf {$ofh} "[%5d] %s", $., $line; 799 } 800 } 801 802 MCE::Hobo->create( \¶llel ) for 1 .. 4; 803 804 $_->join() for MCE::Hobo->list(); 805 806 # handle functions 807 808 my $bool = eof($ifh); 809 my $off = tell($ifh); 810 my $fd = fileno($ifh); 811 my $char = getc($ifh); 812 my $line = readline($ifh); 813 814 binmode $ifh; 815 seek $ifh, 10, 0; 816 read $ifh, my($buf), 80; 817 818 print {$ofh} "foo\n"; 819 printf {$ofh} "%s\n", "bar"; 820 821 open $ofh, ">>", \*STDERR; 822 syswrite $ofh, "shared handle to STDERR\n"; 823 824 close $ifh; 825 close $ofh; 826 827=head1 API DOCUMENTATION 828 829=head2 MCE::Shared::Handle->new ( ) 830 831Called by MCE::Shared for constructing a shared-handle object. 832 833=head2 open ( filehandle, expr ) 834 835=head2 open ( filehandle, mode, expr ) 836 837=head2 open ( filehandle, mode, reference ) 838 839In version 1.007 and later, constructs a new object by opening the file 840whose filename is given by C<expr>, and associates it with C<filehandle>. 841When omitting error checking at the application level, MCE::Shared emits 842a message and stop if open fails. 843 844 # non-shared or local construction for use by a single process 845 846 use MCE::Shared::Handle; 847 848 MCE::Shared::Handle->open( my $fh, "<", "file.log" ) or die "$!"; 849 MCE::Shared::Handle::open my $fh, "<", "file.log" or die "$!"; 850 851 mce_open my $fh, "<", "file.log" or die "$!"; # ditto 852 853 # construction for sharing with other threads and processes 854 855 use MCE::Shared; 856 857 MCE::Shared->open( my $fh, "<", "file.log" ) or die "$!"; 858 MCE::Shared::open my $fh, "<", "file.log" or die "$!"; 859 860 mce_open my $fh, "<", "file.log" or die "$!"; # ditto 861 862=head2 mce_open ( filehandle, expr ) 863 864=head2 mce_open ( filehandle, mode, expr ) 865 866=head2 mce_open ( filehandle, mode, reference ) 867 868Native Perl-like syntax to open a file for reading: 869 870 # mce_open is exported by MCE::Shared or MCE::Shared::Handle. 871 # It creates a shared file handle with MCE::Shared present 872 # or a non-shared handle otherwise. 873 874 mce_open my $fh, "< input.txt" or die "open error: $!"; 875 mce_open my $fh, "<", "input.txt" or die "open error: $!"; 876 mce_open my $fh, "<", \*STDIN or die "open error: $!"; 877 878and for writing: 879 880 mce_open my $fh, "> output.txt" or die "open error: $!"; 881 mce_open my $fh, ">", "output.txt" or die "open error: $!"; 882 mce_open my $fh, ">", \*STDOUT or die "open error: $!"; 883 884=head1 CHUNK IO 885 886Starting with C<MCE::Shared> v1.007, chunk IO is possible for both non-shared 887and shared handles. Chunk IO is enabled by the trailing 'k' or 'm' for read 888size. Also, chunk IO supports the special "\n>"-like record separator. 889That anchors ">" at the start of the line. Workers receive record(s) beginning 890with ">" and ending with "\n". 891 892 # non-shared handle --------------------------------------------- 893 894 use MCE::Shared::Handle; 895 896 mce_open my $fh, '<', 'bio.fasta' or die "open error: $!"; 897 898 # shared handle ------------------------------------------------- 899 900 use MCE::Shared; 901 902 mce_open my $fh, '<', 'bio.fasta' or die "open error: $!"; 903 904 # 'k' or 'm' indicates kibiBytes (KiB) or mebiBytes (MiB) respectively. 905 # Read continues reading until reaching the record separator or EOF. 906 # Optionally, one may specify the record separator. 907 908 $/ = "\n>"; 909 910 while ( read($fh, my($buf), '2k') ) { 911 print "# chunk number: $.\n"; 912 print "$buf\n"; 913 } 914 915C<$.> contains the chunk_id above or the record_number below. C<readline($fh)> 916or C<$fh> may be used for reading a single record. 917 918 while ( my $buf = <$fh> ) { 919 print "# record number: $.\n"; 920 print "$buf\n"; 921 } 922 923The following provides a parallel demonstration. Workers receive the next chunk 924from the shared-manager process where the actual read takes place. MCE::Shared 925also works with C<threads>, C<forks>, and likely other parallel modules. 926 927 use MCE::Hobo; # (change to) use threads; (or) use forks; 928 use MCE::Shared; 929 use feature qw( say ); 930 931 my $pattern = 'something'; 932 my $hugefile = 'somehuge.log'; 933 934 my $result = MCE::Shared->array(); 935 mce_open my $fh, "<", $hugefile or die "open error: $!"; 936 937 sub task { 938 # the trailing 'k' or 'm' for size enables chunk IO 939 while ( read $fh, my( $slurp_chunk ), "640k" ) { 940 my $chunk_id = $.; 941 # process chunk only if a match is found; ie. fast scan 942 # optionally, comment out the if statement and closing brace 943 if ( $slurp_chunk =~ /$pattern/m ) { 944 my @matches; 945 while ( $slurp_chunk =~ /([^\n]+\n)/mg ) { 946 my $line = $1; # save $1 to not lose the value 947 push @matches, $line if ( $line =~ /$pattern/ ); 948 } 949 $result->push( @matches ) if @matches; 950 } 951 } 952 } 953 954 MCE::Hobo->create('task') for 1 .. 4; 955 956 # do something else 957 958 MCE::Hobo->waitall(); 959 960 say $result->len(); 961 962For comparison, the same thing using C<MCE::Flow>. MCE workers read the file 963directly when given a plain path, so will have lesser overhead. However, the 964run time is similar if one were to pass a file handle instead to mce_flow_f. 965 966The benefit of chunk IO is from lesser IPC for the shared-manager process 967(above). Likewise, for the mce-manager process (below). 968 969 use MCE::Flow; 970 use feature qw( say ); 971 972 my $pattern = 'something'; 973 my $hugefile = 'somehuge.log'; 974 975 my @result = mce_flow_f { 976 max_workers => 4, chunk_size => '640k', 977 use_slurpio => 1, 978 }, 979 sub { 980 my ( $mce, $slurp_ref, $chunk_id ) = @_; 981 # process chunk only if a match is found; ie. fast scan 982 # optionally, comment out the if statement and closing brace 983 if ( $$slurp_ref =~ /$pattern/m ) { 984 my @matches; 985 while ( $$slurp_ref =~ /([^\n]+\n)/mg ) { 986 my $line = $1; # save $1 to not lose the value 987 push @matches, $line if ( $line =~ /$pattern/ ); 988 } 989 MCE->gather( @matches ) if @matches; 990 } 991 }, $hugefile; 992 993 say scalar( @result ); 994 995=head1 CREDITS 996 997Implementation inspired by L<Tie::StdHandle>. 998 999=head1 LIMITATIONS 1000 1001Perl must have L<IO::FDPass> for constructing a shared C<condvar> or C<queue> 1002while the shared-manager process is running. For platforms where L<IO::FDPass> 1003isn't possible, construct C<condvar> and C<queue> before other classes. 1004On systems without C<IO::FDPass>, the manager process is delayed until sharing 1005other classes or started explicitly. 1006 1007 use MCE::Shared; 1008 1009 my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0; 1010 1011 my $cv = MCE::Shared->condvar(); 1012 my $que = MCE::Shared->queue(); 1013 1014 MCE::Shared->start() unless $has_IO_FDPass; 1015 1016Regarding mce_open, C<IO::FDPass> is needed for constructing a shared-handle 1017from a non-shared handle not yet available inside the shared-manager process. 1018The workaround is to have the non-shared handle made before the shared-manager 1019is started. Passing a file by reference is fine for the three STD* handles. 1020 1021 # The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR. 1022 1023 mce_open my $shared_in, "<", \*STDIN; # ok 1024 mce_open my $shared_out, ">>", \*STDOUT; # ok 1025 mce_open my $shared_err, ">>", \*STDERR; # ok 1026 mce_open my $shared_fh1, "<", "/path/to/sequence.fasta"; # ok 1027 mce_open my $shared_fh2, ">>", "/path/to/results.log"; # ok 1028 1029 mce_open my $shared_fh, ">>", \*NON_SHARED_FH; # requires IO::FDPass 1030 1031The L<IO::FDPass> module is known to work reliably on most platforms. 1032Install 1.1 or later to rid of limitations described above. 1033 1034 perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'" 1035 1036=head1 INDEX 1037 1038L<MCE|MCE>, L<MCE::Hobo>, L<MCE::Shared> 1039 1040=head1 AUTHOR 1041 1042Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>> 1043 1044=cut 1045 1046