1# -*- perl -*- 2# 3# Copyright (C) 2004-2011 Daniel P. Berrange 4# 5# This program is free software; You can redistribute it and/or modify 6# it under the same terms as Perl itself. Either: 7# 8# a) the GNU General Public License as published by the Free 9# Software Foundation; either version 2, or (at your option) any 10# later version, 11# 12# or 13# 14# b) the "Artistic License" 15# 16# The file "COPYING" distributed along with this file provides full 17# details of the terms and conditions of the two licenses. 18 19=pod 20 21=head1 NAME 22 23Net::DBus::Reactor - application event loop 24 25=head1 SYNOPSIS 26 27Create and run an event loop: 28 29 use Net::DBus::Reactor; 30 my $reactor = Net::DBus::Reactor->main(); 31 32 $reactor->run(); 33 34Manage some file handlers 35 36 $reactor->add_read($fd, 37 Net::DBus::Callback->new(method => sub { 38 my $fd = shift; 39 ...read some data... 40 }, args => [$fd])); 41 42 $reactor->add_write($fd, 43 Net::DBus::Callback->new(method => sub { 44 my $fd = shift; 45 ...write some data... 46 }, args => [$fd])); 47 48Temporarily (dis|en)able a handle 49 50 # Disable 51 $reactor->toggle_read($fd, 0); 52 # Enable 53 $reactor->toggle_read($fd, 1); 54 55Permanently remove a handle 56 57 $reactor->remove_read($fd); 58 59Manage a regular timeout every 100 milliseconds 60 61 my $timer = $reactor->add_timeout(100, 62 Net::DBus::Callback->new( 63 method => sub { 64 ...process the alarm... 65 })); 66 67Temporarily (dis|en)able a timer 68 69 # Disable 70 $reactor->toggle_timeout($timer, 0); 71 # Enable 72 $reactor->toggle_timeout($timer, 1); 73 74Permanently remove a timer 75 76 $reactor->remove_timeout($timer); 77 78Add a post-dispatch hook 79 80 my $hook = $reactor->add_hook(Net::DBus::Callback->new( 81 method => sub { 82 ... do some work... 83 })); 84 85Remove a hook 86 87 $reactor->remove_hook($hook); 88 89=head1 DESCRIPTION 90 91This class provides a general purpose event loop for 92the purposes of multiplexing I/O events and timeouts 93in a single process. The underlying implementation is 94done using the select system call. File handles can 95be registered for monitoring on read, write and exception 96(out-of-band data) events. Timers can be registered 97to expire with a periodic frequency. These are implemented 98using the timeout parameter of the select system call. 99Since this parameter merely represents an upper bound 100on the amount of time the select system call is allowed 101to sleep, the actual period of the timers may vary. Under 102normal load this variance is typically 10 milliseconds. 103Finally, hooks may be registered which will be invoked on 104each iteration of the event loop (ie after processing 105the file events, or timeouts indicated by the select 106system call returning). 107 108=head1 METHODS 109 110=over 4 111 112=cut 113 114package Net::DBus::Reactor; 115 116use 5.006; 117use strict; 118use warnings; 119 120use Net::DBus::Binding::Watch; 121use Net::DBus::Callback; 122use Time::HiRes qw(gettimeofday); 123 124=item my $reactor = Net::DBus::Reactor->new(); 125 126Creates a new event loop ready for monitoring file handles, or 127generating timeouts. Except in very unusual circumstances (examples 128of which I can't think up) it is not necessary or desriable to 129explicitly create new reactor instances. Instead call the L<main> 130method to get a handle to the singleton instance. 131 132=cut 133 134sub new { 135 my $proto = shift; 136 my $class = ref($proto) || $proto; 137 my %params = @_; 138 my $self = {}; 139 140 $self->{fds} = { 141 read => {}, 142 write => {}, 143 exception => {} 144 }; 145 $self->{timeouts} = []; 146 $self->{hooks} = []; 147 148 bless $self, $class; 149 150 return $self; 151} 152 153use vars qw($main_reactor); 154 155=item $reactor = Net::DBus::Reactor->main; 156 157Return a handle to the singleton instance of the reactor. This 158is the recommended way of getting hold of a reactor, since it 159removes the need for modules to pass around handles to their 160privately created reactors. 161 162=cut 163 164sub main { 165 my $class = shift; 166 $main_reactor = $class->new() unless defined $main_reactor; 167 return $main_reactor; 168} 169 170 171=item $reactor->manage($connection); 172 173=item $reactor->manage($server); 174 175Registers a C<Net::DBus::Binding::Connection> or C<Net::DBus::Binding::Server> object 176for management by the event loop. This basically involves 177hooking up the watch & timeout callbacks to the event loop. 178For connections it will also register a hook to invoke the 179C<dispatch> method periodically. 180 181=cut 182 183sub manage { 184 my $self = shift; 185 my $object = shift; 186 187 if ($object->can("set_watch_callbacks")) { 188 $object->set_watch_callbacks(sub { 189 my $object = shift; 190 my $watch = shift; 191 192 $self->_manage_watch_on($object, $watch); 193 }, sub { 194 my $object = shift; 195 my $watch = shift; 196 197 $self->_manage_watch_off($object, $watch); 198 }, sub { 199 my $object = shift; 200 my $watch = shift; 201 202 $self->_manage_watch_toggle($object, $watch); 203 }); 204 } 205 206 if ($object->can("set_timeout_callbacks")) { 207 $object->set_timeout_callbacks(sub { 208 my $object = shift; 209 my $timeout = shift; 210 211 my $key = $self->add_timeout($timeout->get_interval, 212 Net::DBus::Callback->new(object => $timeout, 213 method => "handle", 214 args => []), 215 $timeout->is_enabled); 216 $timeout->set_data($key); 217 }, sub { 218 my $object = shift; 219 my $timeout = shift; 220 221 my $key = $timeout->get_data; 222 $self->remove_timeout($key); 223 }, sub { 224 my $object = shift; 225 my $timeout = shift; 226 227 my $key = $timeout->get_data; 228 $self->toggle_timeout($key, 229 $timeout->is_enabled, 230 $timeout->get_interval); 231 }); 232 } 233 234 if ($object->can("dispatch")) { 235 $self->add_hook(Net::DBus::Callback->new(object => $object, 236 method => "dispatch", 237 args => []), 238 1); 239 } 240 if ($object->can("flush")) { 241 $self->add_hook(Net::DBus::Callback->new(object => $object, 242 method => "flush", 243 args => []), 244 1); 245 } 246} 247 248 249sub _manage_watch_on { 250 my $self = shift; 251 my $object = shift; 252 my $watch = shift; 253 my $flags = $watch->get_flags; 254 255 if ($flags & &Net::DBus::Binding::Watch::READABLE) { 256 $self->add_read($watch->get_fileno, 257 Net::DBus::Callback->new(object => $watch, 258 method => "handle", 259 args => [&Net::DBus::Binding::Watch::READABLE]), 260 $watch->is_enabled); 261 } 262 if ($flags & &Net::DBus::Binding::Watch::WRITABLE) { 263 $self->add_write($watch->get_fileno, 264 Net::DBus::Callback->new(object => $watch, 265 method => "handle", 266 args => [&Net::DBus::Binding::Watch::WRITABLE]), 267 $watch->is_enabled); 268 } 269# $self->add_exception($watch->get_fileno, $watch, 270# Net::DBus::Callback->new(object => $watch, 271# method => "handle", 272# args => [&Net::DBus::Binding::Watch::ERROR]), 273# $watch->is_enabled); 274 275} 276 277sub _manage_watch_off { 278 my $self = shift; 279 my $object = shift; 280 my $watch = shift; 281 my $flags = $watch->get_flags; 282 283 if ($flags & &Net::DBus::Binding::Watch::READABLE) { 284 $self->remove_read($watch->get_fileno); 285 } 286 if ($flags & &Net::DBus::Binding::Watch::WRITABLE) { 287 $self->remove_write($watch->get_fileno); 288 } 289# $self->remove_exception($watch->get_fileno); 290} 291 292sub _manage_watch_toggle { 293 my $self = shift; 294 my $object = shift; 295 my $watch = shift; 296 my $flags = $watch->get_flags; 297 298 if ($flags & &Net::DBus::Binding::Watch::READABLE) { 299 $self->toggle_read($watch->get_fileno, $watch->is_enabled); 300 } 301 if ($flags & &Net::DBus::Binding::Watch::WRITABLE) { 302 $self->toggle_write($watch->get_fileno, $watch->is_enabled); 303 } 304 $self->toggle_exception($watch->get_fileno, $watch->is_enabled); 305} 306 307 308=item $reactor->run(); 309 310Starts the event loop monitoring any registered 311file handles and timeouts. At least one file 312handle, or timer must have been registered prior 313to running the reactor, otherwise it will immediately 314exit. The reactor will run until all registered 315file handles, or timeouts have been removed, or 316disabled. The reactor can be explicitly stopped by 317calling the C<shutdown> method. 318 319=cut 320 321sub run { 322 my $self = shift; 323 324 $self->{running} = 1; 325 while ($self->{running}) { $self->step }; 326} 327 328=item $reactor->shutdown(); 329 330Explicitly shutdown the reactor after pending 331events have been processed. 332 333=cut 334 335sub shutdown { 336 my $self = shift; 337 $self->{running} = 0; 338} 339 340=item $reactor->step(); 341 342Perform one iteration of the event loop, going to 343sleep until an event occurs on a registered file 344handle, or a timeout occurrs. This method is generally 345not required in day-to-day use. 346 347=cut 348 349sub step { 350 my $self = shift; 351 352 my @callbacks = $self->_dispatch_hook(); 353 354 foreach my $callback (@callbacks) { 355 $callback->invoke; 356 } 357 358 my ($ri, $ric) = $self->_bits("read"); 359 my ($wi, $wic) = $self->_bits("write"); 360 my ($ei, $eic) = $self->_bits("exception"); 361 my $timeout = $self->_timeout($self->_now); 362 363 if (!$ric && !$wic && !$eic && !(defined $timeout)) { 364 $self->{running} = 0; 365 } 366 367 # One of the hooks we ran might have requested shutdown 368 # so check here to avoid a undesirable wait in select() 369 # cf RT #39068 370 return unless $self->{running}; 371 372 my ($ro, $wo, $eo); 373 my $n = select($ro=$ri,$wo=$wi,$eo=$ei, (defined $timeout ? ($timeout ? $timeout/1000 : 0) : undef)); 374 375 @callbacks = (); 376 if ($n > 0) { 377 push @callbacks, $self->_dispatch_fd("read", $ro); 378 push @callbacks, $self->_dispatch_fd("write", $wo); 379 push @callbacks, $self->_dispatch_fd("exception", $eo); 380 } 381 push @callbacks, $self->_dispatch_timeout($self->_now); 382 #push @callbacks, $self->_dispatch_hook(); 383 384 foreach my $callback (@callbacks) { 385 $callback->invoke; 386 } 387 388 return 1; 389} 390 391sub _now { 392 my $self = shift; 393 394 my @now = gettimeofday; 395 396 return $now[0] * 1000 + (($now[1] - ($now[1] % 1000)) / 1000); 397} 398 399sub _bits { 400 my $self = shift; 401 my $type = shift; 402 my $vec = ''; 403 404 my $count = 0; 405 foreach (keys %{$self->{fds}->{$type}}) { 406 next unless $self->{fds}->{$type}->{$_}->{enabled}; 407 408 $count++; 409 vec($vec, $_, 1) = 1; 410 } 411 return ($vec, $count); 412} 413 414sub _timeout { 415 my $self = shift; 416 my $now = shift; 417 418 my $timeout; 419 foreach (@{$self->{timeouts}}) { 420 next unless defined && $_->{enabled}; 421 422 my $expired = $now - $_->{last_fired}; 423 # In case the clock was moved we handle $expired being < 0 (see t/26-reactor-time-adjusted.t) 424 $expired = 0 if ($expired < 0); 425 my $interval = ($expired > $_->{interval} ? 0 : $_->{interval} - $expired); 426 $timeout = $interval if !(defined $timeout) || 427 ($interval < $timeout); 428 } 429 return $timeout; 430} 431 432 433sub _dispatch_fd { 434 my $self = shift; 435 my $type = shift; 436 my $vec = shift; 437 438 my @callbacks; 439 foreach my $fd (keys %{$self->{fds}->{$type}}) { 440 next unless $self->{fds}->{$type}->{$fd}->{enabled}; 441 442 if (vec($vec, $fd, 1)) { 443 my $rec = $self->{fds}->{$type}->{$fd}; 444 445 push @callbacks, $self->{fds}->{$type}->{$fd}->{callback}; 446 } 447 } 448 return @callbacks; 449} 450 451 452sub _dispatch_timeout { 453 my $self = shift; 454 my $now = shift; 455 456 my @callbacks; 457 foreach my $timeout (@{$self->{timeouts}}) { 458 next unless defined($timeout) && $timeout->{enabled}; 459 my $expired = $now - $timeout->{last_fired}; 460 # if system clock was adjusted last_fired can be in the future 461 # (see t/26-reactor-time-adjusted.t) 462 $expired = $timeout->{interval} if ($expired < 0); 463 464 # Select typically returns a little (0-10 ms) before we 465 # asked it for. (8 milliseconds seems reasonable balance 466 # between early timeouts & extra select calls 467 if ($expired >= ($timeout->{interval}-8)) { 468 $timeout->{last_fired} = $now; 469 push @callbacks, $timeout->{callback}; 470 } 471 } 472 return @callbacks; 473} 474 475 476sub _dispatch_hook { 477 my $self = shift; 478 my $now = shift; 479 480 my @callbacks; 481 foreach my $hook (@{$self->{hooks}}) { 482 next unless $hook->{enabled}; 483 push @callbacks, $hook->{callback}; 484 } 485 return @callbacks; 486} 487 488 489=item $reactor->add_read($fd, $callback[, $status]); 490 491Registers a file handle for monitoring of read 492events. The C<$callback> parameter specifies either 493a code reference to a subroutine, or an instance of 494the C<Net::DBus::Callback> object to invoke each time 495an event occurs. The optional C<$status> parameter is 496a boolean value to specify whether the watch is 497initially enabled. 498 499=cut 500 501sub add_read { 502 my $self = shift; 503 $self->_add("read", @_); 504} 505 506=item $reactor->add_write($fd, $callback[, $status]); 507 508Registers a file handle for monitoring of write 509events. The C<$callback> parameter specifies either 510a code reference to a subroutine, or an 511instance of the C<Net::DBus::Callback> object to invoke 512each time an event occurs. The optional C<$status> 513parameter is a boolean value to specify whether the 514watch is initially enabled. 515 516=cut 517 518sub add_write { 519 my $self = shift; 520 $self->_add("write", @_); 521} 522 523 524=item $reactor->add_exception($fd, $callback[, $status]); 525 526Registers a file handle for monitoring of exception 527events. The C<$callback> parameter specifies either 528a code reference to a subroutine, or an 529instance of the C<Net::DBus::Callback> object to invoke 530each time an event occurs. The optional C<$status> 531parameter is a boolean value to specify whether the 532watch is initially enabled. 533 534=cut 535 536sub add_exception { 537 my $self = shift; 538 $self->_add("exception", @_); 539} 540 541 542=item my $id = $reactor->add_timeout($interval, $callback, $status); 543 544Registers a new timeout to expire every C<$interval> 545milliseconds. The C<$callback> parameter specifies either 546a code reference to a subroutine, or an 547instance of the C<Net::DBus::Callback> object to invoke 548each time the timeout expires. The optional C<$status> 549parameter is a boolean value to specify whether the 550timeout is initially enabled. The return parameter is 551a unique identifier which can be used to later remove 552or disable the timeout. 553 554=cut 555 556sub add_timeout { 557 my $self = shift; 558 my $interval = shift; 559 my $callback = shift; 560 my $enabled = shift; 561 $enabled = 1 unless defined $enabled; 562 563 if (ref($callback) eq "CODE") { 564 $callback = Net::DBus::Callback->new(method => $callback); 565 } 566 567 my $key; 568 for (my $i = 0 ; $i <= $#{$self->{timeouts}} && !(defined $key); $i++) { 569 $key = $i unless defined $self->{timeouts}->[$i]; 570 } 571 $key = $#{$self->{timeouts}}+1 unless defined $key; 572 573 $self->{timeouts}->[$key] = { 574 interval => $interval, 575 last_fired => $self->_now, 576 callback => $callback, 577 enabled => $enabled 578 }; 579 580 return $key; 581} 582 583 584=item $reactor->remove_timeout($id); 585 586Removes a previously registered timeout specified by 587the C<$id> parameter. 588 589=cut 590 591sub remove_timeout { 592 my $self = shift; 593 my $key = shift; 594 595 die "no timeout active with key '$key'" 596 unless defined $self->{timeouts}->[$key]; 597 598 $self->{timeouts}->[$key] = undef; 599} 600 601 602=item $reactor->toggle_timeout($id, $status[, $interval]); 603 604Updates the state of a previously registered timeout 605specified by the C<$id> parameter. The C<$status> 606parameter specifies whether the timeout is to be enabled 607or disabled, while the optional C<$interval> parameter 608can be used to change the period of the timeout. 609 610=cut 611 612sub toggle_timeout { 613 my $self = shift; 614 my $key = shift; 615 my $enabled = shift; 616 617 die "no timeout active with key '$key'" 618 unless defined $self->{timeouts}->[$key]; 619 620 $self->{timeouts}->[$key]->{enabled} = $enabled; 621 $self->{timeouts}->[$key]->{interval} = shift if @_; 622} 623 624 625=item my $id = $reactor->add_hook($callback[, $status]); 626 627Registers a new hook to be fired on each iteration 628of the event loop. The C<$callback> parameter 629specifies either a code reference to a subroutine, or 630an instance of the C<Net::DBus::Callback> 631class to invoke. The C<$status> parameter determines 632whether the hook is initially enabled, or disabled. 633The return parameter is a unique id which should 634be used to later remove, or disable the hook. 635 636=cut 637 638sub add_hook { 639 my $self = shift; 640 my $callback = shift; 641 my $enabled = shift; 642 $enabled = 1 unless defined $enabled; 643 644 if (ref($callback) eq "CODE") { 645 $callback = Net::DBus::Callback->new(method => $callback); 646 } 647 648 my $key; 649 for (my $i = 0 ; $i <= $#{$self->{hooks}} && !(defined $key); $i++) { 650 $key = $i unless defined $self->{hooks}->[$i]; 651 } 652 $key = $#{$self->{hooks}}+1 unless defined $key; 653 654 $self->{hooks}->[$key] = { 655 callback => $callback, 656 enabled => $enabled 657 }; 658 659 return $key; 660} 661 662 663=item $reactor->remove_hook($id) 664 665Removes the previously registered hook identified 666by C<$id>. 667 668=cut 669 670sub remove_hook { 671 my $self = shift; 672 my $key = shift; 673 674 die "no hook present with key '$key'" 675 unless defined $self->{hooks}->[$key]; 676 677 678 $self->{hooks}->[$key] = undef; 679} 680 681=item $reactor->toggle_hook($id, $status) 682 683Updates the status of the previously registered 684hook identified by C<$id>. The C<$status> parameter 685determines whether the hook is to be enabled or 686disabled. 687 688=cut 689 690sub toggle_hook { 691 my $self = shift; 692 my $key = shift; 693 my $enabled = shift; 694 695 $self->{hooks}->[$key]->{enabled} = $enabled; 696} 697 698sub _add { 699 my $self = shift; 700 my $type = shift; 701 my $fd = shift; 702 my $callback = shift; 703 my $enabled = shift; 704 $enabled = 1 unless defined $enabled; 705 706 if (ref($callback) eq "CODE") { 707 $callback = Net::DBus::Callback->new(method => $callback); 708 } 709 710 $self->{fds}->{$type}->{$fd} = { 711 callback => $callback, 712 enabled => $enabled 713 }; 714} 715 716=item $reactor->remove_read($fd); 717 718=item $reactor->remove_write($fd); 719 720=item $reactor->remove_exception($fd); 721 722Removes a watch on the file handle C<$fd>. 723 724=cut 725 726sub remove_read { 727 my $self = shift; 728 $self->_remove("read", @_); 729} 730 731sub remove_write { 732 my $self = shift; 733 $self->_remove("write", @_); 734} 735 736sub remove_exception { 737 my $self = shift; 738 $self->_remove("exception", @_); 739} 740 741sub _remove { 742 my $self = shift; 743 my $type = shift; 744 my $fd = shift; 745 746 die "no handle ($type) active with fd '$fd'" 747 unless exists $self->{fds}->{$type}->{$fd}; 748 749 delete $self->{fds}->{$type}->{$fd}; 750} 751 752=item $reactor->toggle_read($fd, $status); 753 754=item $reactor->toggle_write($fd, $status); 755 756=item $reactor->toggle_exception($fd, $status); 757 758Updates the status of a watch on the file handle C<$fd>. 759The C<$status> parameter species whether the watch is 760to be enabled or disabled. 761 762=cut 763 764sub toggle_read { 765 my $self = shift; 766 $self->_toggle("read", @_); 767} 768 769sub toggle_write { 770 my $self = shift; 771 $self->_toggle("write", @_); 772} 773 774sub toggle_exception { 775 my $self = shift; 776 $self->_toggle("exception", @_); 777} 778 779sub _toggle { 780 my $self = shift; 781 my $type = shift; 782 my $fd = shift; 783 my $enabled = shift; 784 785 $self->{fds}->{$type}->{$fd}->{enabled} = $enabled; 786} 787 788 7891; 790 791=pod 792 793=back 794 795=head1 SEE ALSO 796 797L<Net::DBus::Callback>, L<Net::DBus::Connection>, L<Net::DBus::Server> 798 799=head1 AUTHOR 800 801Daniel Berrange E<lt>dan@berrange.comE<gt> 802 803=head1 COPYRIGHT 804 805Copyright 2004-2011 by Daniel Berrange 806 807=cut 808