1# -*- perl -*-
2#
3# Copyright (C) 2004-2011 Daniel P. Berrange
4#
5# This program is free software; You can redistribute it and/or modify
6# it under the same terms as Perl itself. Either:
7#
8# a) the GNU General Public License as published by the Free
9#   Software Foundation; either version 2, or (at your option) any
10#   later version,
11#
12# or
13#
14# b) the "Artistic License"
15#
16# The file "COPYING" distributed along with this file provides full
17# details of the terms and conditions of the two licenses.
18
19=pod
20
21=head1 NAME
22
23Net::DBus::Reactor - application event loop
24
25=head1 SYNOPSIS
26
27Create and run an event loop:
28
29   use Net::DBus::Reactor;
30   my $reactor = Net::DBus::Reactor->main();
31
32   $reactor->run();
33
34Manage some file handlers
35
36   $reactor->add_read($fd,
37                      Net::DBus::Callback->new(method => sub {
38                         my $fd = shift;
39                         ...read some data...
40                      }, args => [$fd]));
41
42   $reactor->add_write($fd,
43                       Net::DBus::Callback->new(method => sub {
44                          my $fd = shift;
45                          ...write some data...
46                       }, args => [$fd]));
47
48Temporarily (dis|en)able a handle
49
50   # Disable
51   $reactor->toggle_read($fd, 0);
52   # Enable
53   $reactor->toggle_read($fd, 1);
54
55Permanently remove a handle
56
57   $reactor->remove_read($fd);
58
59Manage a regular timeout every 100 milliseconds
60
61   my $timer = $reactor->add_timeout(100,
62                                     Net::DBus::Callback->new(
63              method => sub {
64                 ...process the alarm...
65              }));
66
67Temporarily (dis|en)able a timer
68
69   # Disable
70   $reactor->toggle_timeout($timer, 0);
71   # Enable
72   $reactor->toggle_timeout($timer, 1);
73
74Permanently remove a timer
75
76   $reactor->remove_timeout($timer);
77
78Add a post-dispatch hook
79
80   my $hook = $reactor->add_hook(Net::DBus::Callback->new(
81         method => sub {
82            ... do some work...
83         }));
84
85Remove a hook
86
87   $reactor->remove_hook($hook);
88
89=head1 DESCRIPTION
90
91This class provides a general purpose event loop for
92the purposes of multiplexing I/O events and timeouts
93in a single process. The underlying implementation is
94done using the select system call. File handles can
95be registered for monitoring on read, write and exception
96(out-of-band data) events. Timers can be registered
97to expire with a periodic frequency. These are implemented
98using the timeout parameter of the select system call.
99Since this parameter merely represents an upper bound
100on the amount of time the select system call is allowed
101to sleep, the actual period of the timers may vary. Under
102normal load this variance is typically 10 milliseconds.
103Finally, hooks may be registered which will be invoked on
104each iteration of the event loop (ie after processing
105the file events, or timeouts indicated by the select
106system call returning).
107
108=head1 METHODS
109
110=over 4
111
112=cut
113
114package Net::DBus::Reactor;
115
116use 5.006;
117use strict;
118use warnings;
119
120use Net::DBus::Binding::Watch;
121use Net::DBus::Callback;
122use Time::HiRes qw(gettimeofday);
123
124=item my $reactor = Net::DBus::Reactor->new();
125
126Creates a new event loop ready for monitoring file handles, or
127generating timeouts. Except in very unusual circumstances (examples
128of which I can't think up) it is not necessary or desriable to
129explicitly create new reactor instances. Instead call the L<main>
130method to get a handle to the singleton instance.
131
132=cut
133
134sub new {
135    my $proto = shift;
136    my $class = ref($proto) || $proto;
137    my %params = @_;
138    my $self = {};
139
140    $self->{fds} = {
141	read => {},
142	write => {},
143	exception => {}
144    };
145    $self->{timeouts} = [];
146    $self->{hooks} = [];
147
148    bless $self, $class;
149
150    return $self;
151}
152
153use vars qw($main_reactor);
154
155=item $reactor = Net::DBus::Reactor->main;
156
157Return a handle to the singleton instance of the reactor. This
158is the recommended way of getting hold of a reactor, since it
159removes the need for modules to pass around handles to their
160privately created reactors.
161
162=cut
163
164sub main {
165    my $class = shift;
166    $main_reactor = $class->new() unless defined $main_reactor;
167    return $main_reactor;
168}
169
170
171=item $reactor->manage($connection);
172
173=item $reactor->manage($server);
174
175Registers a C<Net::DBus::Binding::Connection> or C<Net::DBus::Binding::Server> object
176for management by the event loop. This basically involves
177hooking up the watch & timeout callbacks to the event loop.
178For connections it will also register a hook to invoke the
179C<dispatch> method periodically.
180
181=cut
182
183sub manage {
184    my $self = shift;
185    my $object = shift;
186
187    if ($object->can("set_watch_callbacks")) {
188	$object->set_watch_callbacks(sub {
189	    my $object = shift;
190	    my $watch = shift;
191
192	    $self->_manage_watch_on($object, $watch);
193	}, sub {
194	    my $object = shift;
195	    my $watch = shift;
196
197	    $self->_manage_watch_off($object, $watch);
198	}, sub {
199	    my $object = shift;
200	    my $watch = shift;
201
202	    $self->_manage_watch_toggle($object, $watch);
203	});
204    }
205
206    if ($object->can("set_timeout_callbacks")) {
207	$object->set_timeout_callbacks(sub {
208	    my $object = shift;
209	    my $timeout = shift;
210
211	    my $key = $self->add_timeout($timeout->get_interval,
212					 Net::DBus::Callback->new(object => $timeout,
213								  method => "handle",
214								  args => []),
215					 $timeout->is_enabled);
216	    $timeout->set_data($key);
217	}, sub {
218	    my $object = shift;
219	    my $timeout = shift;
220
221	    my $key = $timeout->get_data;
222	    $self->remove_timeout($key);
223	}, sub {
224	    my $object = shift;
225	    my $timeout = shift;
226
227	    my $key = $timeout->get_data;
228	    $self->toggle_timeout($key,
229				  $timeout->is_enabled,
230				  $timeout->get_interval);
231	});
232    }
233
234    if ($object->can("dispatch")) {
235	$self->add_hook(Net::DBus::Callback->new(object => $object,
236						 method => "dispatch",
237						 args => []),
238			1);
239    }
240    if ($object->can("flush")) {
241	$self->add_hook(Net::DBus::Callback->new(object => $object,
242						 method => "flush",
243						 args => []),
244			1);
245    }
246}
247
248
249sub _manage_watch_on {
250    my $self = shift;
251    my $object = shift;
252    my $watch = shift;
253    my $flags = $watch->get_flags;
254
255    if ($flags & &Net::DBus::Binding::Watch::READABLE) {
256	$self->add_read($watch->get_fileno,
257			Net::DBus::Callback->new(object => $watch,
258					    method => "handle",
259					    args => [&Net::DBus::Binding::Watch::READABLE]),
260			$watch->is_enabled);
261    }
262    if ($flags & &Net::DBus::Binding::Watch::WRITABLE) {
263	$self->add_write($watch->get_fileno,
264			 Net::DBus::Callback->new(object => $watch,
265					     method => "handle",
266					     args => [&Net::DBus::Binding::Watch::WRITABLE]),
267			 $watch->is_enabled);
268    }
269#    $self->add_exception($watch->get_fileno, $watch,
270#			 Net::DBus::Callback->new(object => $watch,
271#					     method => "handle",
272#					     args => [&Net::DBus::Binding::Watch::ERROR]),
273#			 $watch->is_enabled);
274
275}
276
277sub _manage_watch_off {
278    my $self = shift;
279    my $object = shift;
280    my $watch = shift;
281    my $flags = $watch->get_flags;
282
283    if ($flags & &Net::DBus::Binding::Watch::READABLE) {
284	$self->remove_read($watch->get_fileno);
285    }
286    if ($flags & &Net::DBus::Binding::Watch::WRITABLE) {
287	$self->remove_write($watch->get_fileno);
288    }
289#    $self->remove_exception($watch->get_fileno);
290}
291
292sub _manage_watch_toggle {
293    my $self = shift;
294    my $object = shift;
295    my $watch = shift;
296    my $flags = $watch->get_flags;
297
298    if ($flags & &Net::DBus::Binding::Watch::READABLE) {
299	$self->toggle_read($watch->get_fileno, $watch->is_enabled);
300    }
301    if ($flags & &Net::DBus::Binding::Watch::WRITABLE) {
302	$self->toggle_write($watch->get_fileno, $watch->is_enabled);
303    }
304    $self->toggle_exception($watch->get_fileno, $watch->is_enabled);
305}
306
307
308=item $reactor->run();
309
310Starts the event loop monitoring any registered
311file handles and timeouts. At least one file
312handle, or timer must have been registered prior
313to running the reactor, otherwise it will immediately
314exit. The reactor will run until all registered
315file handles, or timeouts have been removed, or
316disabled. The reactor can be explicitly stopped by
317calling the C<shutdown> method.
318
319=cut
320
321sub run {
322    my $self = shift;
323
324    $self->{running} = 1;
325    while ($self->{running}) { $self->step };
326}
327
328=item $reactor->shutdown();
329
330Explicitly shutdown the reactor after pending
331events have been processed.
332
333=cut
334
335sub shutdown {
336    my $self = shift;
337    $self->{running} = 0;
338}
339
340=item $reactor->step();
341
342Perform one iteration of the event loop, going to
343sleep until an event occurs on a registered file
344handle, or a timeout occurrs. This method is generally
345not required in day-to-day use.
346
347=cut
348
349sub step {
350    my $self = shift;
351
352    my @callbacks = $self->_dispatch_hook();
353
354    foreach my $callback (@callbacks) {
355	$callback->invoke;
356    }
357
358    my ($ri, $ric) = $self->_bits("read");
359    my ($wi, $wic) = $self->_bits("write");
360    my ($ei, $eic) = $self->_bits("exception");
361    my $timeout = $self->_timeout($self->_now);
362
363    if (!$ric && !$wic && !$eic && !(defined $timeout)) {
364	$self->{running} = 0;
365    }
366
367    # One of the hooks we ran might have requested shutdown
368    # so check here to avoid a undesirable wait in select()
369    # cf RT #39068
370    return unless $self->{running};
371
372    my ($ro, $wo, $eo);
373    my $n = select($ro=$ri,$wo=$wi,$eo=$ei, (defined $timeout ? ($timeout ? $timeout/1000 : 0) : undef));
374
375    @callbacks = ();
376    if ($n > 0) {
377	push @callbacks, $self->_dispatch_fd("read", $ro);
378	push @callbacks, $self->_dispatch_fd("write", $wo);
379	push @callbacks, $self->_dispatch_fd("exception", $eo);
380    }
381    push @callbacks, $self->_dispatch_timeout($self->_now);
382    #push @callbacks, $self->_dispatch_hook();
383
384    foreach my $callback (@callbacks) {
385	$callback->invoke;
386    }
387
388    return 1;
389}
390
391sub _now {
392    my $self = shift;
393
394    my @now = gettimeofday;
395
396    return $now[0] * 1000 + (($now[1] - ($now[1] % 1000)) / 1000);
397}
398
399sub _bits {
400    my $self = shift;
401    my $type = shift;
402    my $vec = '';
403
404    my $count = 0;
405    foreach (keys %{$self->{fds}->{$type}}) {
406	next unless $self->{fds}->{$type}->{$_}->{enabled};
407
408	$count++;
409	vec($vec, $_, 1) = 1;
410    }
411    return ($vec, $count);
412}
413
414sub _timeout {
415    my $self = shift;
416    my $now = shift;
417
418    my $timeout;
419    foreach (@{$self->{timeouts}}) {
420	next unless defined && $_->{enabled};
421
422	my $expired = $now - $_->{last_fired};
423	# In case the clock was moved we handle $expired being < 0 (see t/26-reactor-time-adjusted.t)
424	$expired = 0 if ($expired < 0);
425	my $interval = ($expired > $_->{interval} ? 0 : $_->{interval} - $expired);
426	$timeout = $interval if !(defined $timeout) ||
427	    ($interval < $timeout);
428    }
429    return $timeout;
430}
431
432
433sub _dispatch_fd {
434    my $self = shift;
435    my $type = shift;
436    my $vec = shift;
437
438    my @callbacks;
439    foreach my $fd (keys %{$self->{fds}->{$type}}) {
440	next unless $self->{fds}->{$type}->{$fd}->{enabled};
441
442	if (vec($vec, $fd, 1)) {
443	    my $rec = $self->{fds}->{$type}->{$fd};
444
445	    push @callbacks, $self->{fds}->{$type}->{$fd}->{callback};
446	}
447    }
448    return @callbacks;
449}
450
451
452sub _dispatch_timeout {
453    my $self = shift;
454    my $now = shift;
455
456    my @callbacks;
457    foreach my $timeout (@{$self->{timeouts}}) {
458	next unless defined($timeout) && $timeout->{enabled};
459	my $expired = $now - $timeout->{last_fired};
460	# if system clock was adjusted last_fired can be in the future
461	# (see t/26-reactor-time-adjusted.t)
462	$expired = $timeout->{interval} if ($expired < 0);
463
464	# Select typically returns a little (0-10 ms) before we
465	# asked it for. (8 milliseconds seems reasonable balance
466	# between early timeouts & extra select calls
467	if ($expired >= ($timeout->{interval}-8)) {
468	    $timeout->{last_fired} = $now;
469	    push @callbacks, $timeout->{callback};
470	}
471    }
472    return @callbacks;
473}
474
475
476sub _dispatch_hook {
477    my $self = shift;
478    my $now = shift;
479
480    my @callbacks;
481    foreach my $hook (@{$self->{hooks}}) {
482	next unless $hook->{enabled};
483	push @callbacks, $hook->{callback};
484    }
485    return @callbacks;
486}
487
488
489=item $reactor->add_read($fd, $callback[, $status]);
490
491Registers a file handle for monitoring of read
492events. The C<$callback> parameter specifies either
493a code reference to a subroutine, or an instance of
494the C<Net::DBus::Callback> object to invoke each time
495an event occurs. The optional C<$status> parameter is
496a boolean value to specify whether the watch is
497initially enabled.
498
499=cut
500
501sub add_read {
502    my $self = shift;
503    $self->_add("read", @_);
504}
505
506=item $reactor->add_write($fd, $callback[, $status]);
507
508Registers a file handle for monitoring of write
509events. The C<$callback> parameter specifies either
510a code reference to a subroutine, or an
511instance of the C<Net::DBus::Callback> object to invoke
512each time an event occurs. The optional C<$status>
513parameter is a boolean value to specify whether the
514watch is initially enabled.
515
516=cut
517
518sub add_write {
519    my $self = shift;
520    $self->_add("write", @_);
521}
522
523
524=item $reactor->add_exception($fd, $callback[, $status]);
525
526Registers a file handle for monitoring of exception
527events. The C<$callback> parameter specifies either
528a code reference to a subroutine, or  an
529instance of the C<Net::DBus::Callback> object to invoke
530each time an event occurs. The optional C<$status>
531parameter is a boolean value to specify whether the
532watch is initially enabled.
533
534=cut
535
536sub add_exception {
537    my $self = shift;
538    $self->_add("exception", @_);
539}
540
541
542=item my $id = $reactor->add_timeout($interval, $callback, $status);
543
544Registers a new timeout to expire every C<$interval>
545milliseconds. The C<$callback> parameter specifies either
546a code reference to a subroutine, or an
547instance of the C<Net::DBus::Callback> object to invoke
548each time the timeout expires. The optional C<$status>
549parameter is a boolean value to specify whether the
550timeout is initially enabled. The return parameter is
551a unique identifier which can be used to later remove
552or disable the timeout.
553
554=cut
555
556sub add_timeout {
557    my $self = shift;
558    my $interval = shift;
559    my $callback = shift;
560    my $enabled = shift;
561    $enabled = 1 unless defined $enabled;
562
563    if (ref($callback) eq "CODE") {
564	$callback = Net::DBus::Callback->new(method => $callback);
565    }
566
567    my $key;
568    for (my $i = 0 ; $i <= $#{$self->{timeouts}} && !(defined $key); $i++) {
569	$key = $i unless defined $self->{timeouts}->[$i];
570    }
571    $key = $#{$self->{timeouts}}+1 unless defined $key;
572
573    $self->{timeouts}->[$key] = {
574	interval => $interval,
575	last_fired => $self->_now,
576	callback => $callback,
577	enabled => $enabled
578	};
579
580    return $key;
581}
582
583
584=item $reactor->remove_timeout($id);
585
586Removes a previously registered timeout specified by
587the C<$id> parameter.
588
589=cut
590
591sub remove_timeout {
592    my $self = shift;
593    my $key = shift;
594
595    die "no timeout active with key '$key'"
596	unless defined $self->{timeouts}->[$key];
597
598    $self->{timeouts}->[$key] = undef;
599}
600
601
602=item $reactor->toggle_timeout($id, $status[, $interval]);
603
604Updates the state of a previously registered timeout
605specified by the C<$id> parameter. The C<$status>
606parameter specifies whether the timeout is to be enabled
607or disabled, while the optional C<$interval> parameter
608can be used to change the period of the timeout.
609
610=cut
611
612sub toggle_timeout {
613    my $self = shift;
614    my $key = shift;
615    my $enabled = shift;
616
617    die "no timeout active with key '$key'"
618	unless defined $self->{timeouts}->[$key];
619
620    $self->{timeouts}->[$key]->{enabled} = $enabled;
621    $self->{timeouts}->[$key]->{interval} = shift if @_;
622}
623
624
625=item my $id = $reactor->add_hook($callback[, $status]);
626
627Registers a new hook to be fired on each iteration
628of the event loop. The C<$callback> parameter
629specifies  either a code reference to a subroutine, or
630an instance of the C<Net::DBus::Callback>
631class to invoke. The C<$status> parameter determines
632whether the hook is initially enabled, or disabled.
633The return parameter is a unique id which should
634be used to later remove, or disable the hook.
635
636=cut
637
638sub add_hook {
639    my $self = shift;
640    my $callback = shift;
641    my $enabled = shift;
642    $enabled = 1 unless defined $enabled;
643
644    if (ref($callback) eq "CODE") {
645	$callback = Net::DBus::Callback->new(method => $callback);
646    }
647
648    my $key;
649    for (my $i = 0 ; $i <= $#{$self->{hooks}} && !(defined $key); $i++) {
650	$key = $i unless defined $self->{hooks}->[$i];
651    }
652    $key = $#{$self->{hooks}}+1 unless defined $key;
653
654    $self->{hooks}->[$key] = {
655	callback => $callback,
656	enabled => $enabled
657	};
658
659    return $key;
660}
661
662
663=item $reactor->remove_hook($id)
664
665Removes the previously registered hook identified
666by C<$id>.
667
668=cut
669
670sub remove_hook {
671    my $self = shift;
672    my $key = shift;
673
674    die "no hook present with key '$key'"
675	unless defined $self->{hooks}->[$key];
676
677
678    $self->{hooks}->[$key] = undef;
679}
680
681=item $reactor->toggle_hook($id, $status)
682
683Updates the status of the previously registered
684hook identified by C<$id>. The C<$status> parameter
685determines whether the hook is to be enabled or
686disabled.
687
688=cut
689
690sub toggle_hook {
691    my $self = shift;
692    my $key = shift;
693    my $enabled = shift;
694
695    $self->{hooks}->[$key]->{enabled} = $enabled;
696}
697
698sub _add {
699    my $self = shift;
700    my $type = shift;
701    my $fd = shift;
702    my $callback = shift;
703    my $enabled = shift;
704    $enabled = 1 unless defined $enabled;
705
706    if (ref($callback) eq "CODE") {
707	$callback = Net::DBus::Callback->new(method => $callback);
708    }
709
710    $self->{fds}->{$type}->{$fd} = {
711	callback => $callback,
712	enabled => $enabled
713	};
714}
715
716=item $reactor->remove_read($fd);
717
718=item $reactor->remove_write($fd);
719
720=item $reactor->remove_exception($fd);
721
722Removes a watch on the file handle C<$fd>.
723
724=cut
725
726sub remove_read {
727    my $self = shift;
728    $self->_remove("read", @_);
729}
730
731sub remove_write {
732    my $self = shift;
733    $self->_remove("write", @_);
734}
735
736sub remove_exception {
737    my $self = shift;
738    $self->_remove("exception", @_);
739}
740
741sub _remove {
742    my $self = shift;
743    my $type = shift;
744    my $fd = shift;
745
746    die "no handle ($type) active with fd '$fd'"
747	unless exists $self->{fds}->{$type}->{$fd};
748
749    delete $self->{fds}->{$type}->{$fd};
750}
751
752=item $reactor->toggle_read($fd, $status);
753
754=item $reactor->toggle_write($fd, $status);
755
756=item $reactor->toggle_exception($fd, $status);
757
758Updates the status of a watch on the file handle C<$fd>.
759The C<$status> parameter species whether the watch is
760to be enabled or disabled.
761
762=cut
763
764sub toggle_read {
765    my $self = shift;
766    $self->_toggle("read", @_);
767}
768
769sub toggle_write {
770    my $self = shift;
771    $self->_toggle("write", @_);
772}
773
774sub toggle_exception {
775    my $self = shift;
776    $self->_toggle("exception", @_);
777}
778
779sub _toggle {
780    my $self = shift;
781    my $type = shift;
782    my $fd = shift;
783    my $enabled = shift;
784
785    $self->{fds}->{$type}->{$fd}->{enabled} = $enabled;
786}
787
788
7891;
790
791=pod
792
793=back
794
795=head1 SEE ALSO
796
797L<Net::DBus::Callback>, L<Net::DBus::Connection>, L<Net::DBus::Server>
798
799=head1 AUTHOR
800
801Daniel Berrange E<lt>dan@berrange.comE<gt>
802
803=head1 COPYRIGHT
804
805Copyright 2004-2011 by Daniel Berrange
806
807=cut
808