1# Data and accessors to manage POE's events.
2
3package POE::Resource::Events;
4
5use vars qw($VERSION);
6$VERSION = '1.368'; # NOTE - Should be #.### (three decimal places)
7
8# These methods are folded into POE::Kernel;
9package POE::Kernel;
10
11use strict;
12
13# A local copy of the queue so we can manipulate it directly.
14my $kr_queue;
15
16my %event_count;
17#  ( $session_id => $count,
18#    ...,
19#  );
20
21my %post_count;
22#  ( $session_id => $count,
23#    ...,
24#  );
25
26### Begin-run initialization.
27
28sub _data_ev_initialize {
29  my ($self, $queue) = @_;
30  $kr_queue = $queue;
31}
32
33### End-run leak checking.
34
35sub _data_ev_relocate_kernel_id {
36  my ($self, $old_id, $new_id) = @_;
37
38  $event_count{$new_id} = delete $event_count{$old_id}
39    if exists $event_count{$old_id};
40  $post_count{$new_id} = delete $post_count{$old_id}
41    if exists $post_count{$old_id};
42}
43
44sub _data_ev_finalize {
45  my $finalized_ok = 1;
46  while (my ($ses_id, $cnt) = each(%event_count)) {
47    $finalized_ok = 0;
48    _warn("!!! Leaked event-to count: $ses_id = $cnt\n");
49  }
50
51  while (my ($ses_id, $cnt) = each(%post_count)) {
52    $finalized_ok = 0;
53    _warn("!!! Leaked event-from count: $ses_id = $cnt\n");
54  }
55  return $finalized_ok;
56}
57
58### Enqueue an event.
59
60sub FIFO_TIME_EPSILON () { 0.000001 }
61my $last_fifo_time = monotime();
62
63sub _data_ev_enqueue {
64  my (
65    $self,
66    $session, $source_session, $event, $type, $etc,
67    $file, $line, $fromstate, $time, $delta, $priority
68  ) = @_;
69
70  my $sid = $session->ID;
71
72  if (ASSERT_DATA) {
73    unless ($self->_data_ses_exists($sid)) {
74      _trap(
75        "<ev> can't enqueue event ``$event'' for nonexistent",
76        $self->_data_alias_loggable($sid)
77      );
78    }
79  }
80
81  # This is awkward, but faster than using the fields individually.
82  my $event_to_enqueue = [ @_[(1+EV_SESSION) .. (1+EV_FROMSTATE)] ];
83  if( defined $time ) {
84    $event_to_enqueue->[EV_WALLTIME] = $time;
85    $event_to_enqueue->[EV_DELTA]    = $delta;
86    $priority ||= wall2mono( $time + ($delta||0) );
87  }
88  else {
89    $priority ||= monotime();
90  }
91
92  my $new_id;
93  my $old_head_priority = $kr_queue->get_next_priority();
94
95  unless ($type & ET_MASK_DELAYED) {
96    $priority = $last_fifo_time + FIFO_TIME_EPSILON if $priority <= $last_fifo_time;
97    $last_fifo_time = $priority;
98  }
99
100  $new_id = $kr_queue->enqueue($priority, $event_to_enqueue);
101  $event_to_enqueue->[EV_SEQ] = $new_id;
102
103  #_carp( Carp::longmess( "<ev> priority is much to far in the future" ) ) if $priority > 1354569908;
104  if (TRACE_EVENTS ) {
105    _warn(
106      "<ev> enqueued event $new_id ``$event'' from ",
107      $self->_data_alias_loggable($source_session->ID), " to ",
108      $self->_data_alias_loggable($sid),
109      " at $time, priority=$priority"
110    );
111  }
112
113  unless (defined $old_head_priority) {
114    $self->loop_resume_time_watcher($priority);
115  }
116  elsif ($priority < $old_head_priority) {
117    $self->loop_reset_time_watcher($priority);
118  }
119
120  # This is the counterpart to _data_ev_refcount_dec().  It's only
121  # used in one place, so it's not in its own function.
122
123  $self->_data_ses_refcount_inc($sid) unless $event_count{$sid}++;
124
125  return $new_id if $sid eq $source_session->ID();
126
127  $self->_data_ses_refcount_inc($source_session->ID) unless (
128    $post_count{$source_session->ID}++
129  );
130
131  return $new_id;
132}
133
134sub _data_ev_set
135{
136    my( $self, $alarm_id, $my_alarm, $time, $pri, $delta ) = @_;
137
138    my $event = (
139      grep { $_->[1] == $alarm_id }
140      $kr_queue->peek_items( $my_alarm )
141    )[0];
142
143    return unless $event;
144
145    my $payload = $event->[ITEM_PAYLOAD];
146
147    # XXX - However, if there has been a clock skew, the priority will
148    # have changed and we should recalculate priority from time+delta
149
150    $delta = $payload->[EV_DELTA] || 0 unless defined $delta;
151    $kr_queue->set_priority( $alarm_id, $my_alarm, $pri+$delta );
152    $payload->[EV_WALLTIME] = $time;
153    $payload->[EV_DELTA]    = $delta;
154
155    return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) );
156}
157
158sub _data_ev_adjust
159{
160    my( $self, $alarm_id, $my_alarm, $time, $delta ) = @_;
161
162    # XXX - However, if there has been a clock skew, the priority will
163    # have changed and we should recalculate priority from time+delta
164    if( $time ) {
165        # PG - We are never invoked with $time anyway.
166        $kr_queue->set_priority( $alarm_id, $my_alarm, $time+$delta );
167    }
168    else {
169        $kr_queue->adjust_priority( $alarm_id, $my_alarm, $delta );
170    }
171
172    my $event = (
173      grep { $_->[1] == $alarm_id }
174      $kr_queue->peek_items( $my_alarm )
175    )[0];
176
177    return unless $event;
178
179    my $payload = $event->[ITEM_PAYLOAD];
180
181    $payload->[EV_WALLTIME] = $time if $time;
182    $payload->[EV_DELTA] += $delta  if $delta;
183
184    return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) );
185}
186
187### Remove events sent to or from a specific session.
188
189sub _data_ev_clear_session {
190  my ($self, $sid) = @_;
191
192  # Events sent to the session.
193  PENDING: {
194    my $pending_count = $event_count{$sid};
195    last PENDING unless $pending_count;
196
197    foreach (
198      $kr_queue->remove_items(
199        sub { $_[0][EV_SESSION]->ID() eq $sid },
200        $pending_count
201      )
202    ) {
203      $self->_data_ev_refcount_dec(
204        @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]
205      );
206      $pending_count--;
207    }
208
209    # TODO - fork() can make this go negative on some systems.
210    last PENDING unless $pending_count;
211
212    croak "lingering pending count: $pending_count" if $pending_count;
213  }
214
215  # Events sent by the session.
216  SENT: {
217    my $sent_count = $post_count{$sid};
218    last SENT unless $sent_count;
219
220    foreach (
221      $kr_queue->remove_items(
222        sub { $_[0][EV_SOURCE]->ID() eq $sid },
223        $sent_count
224      )
225    ) {
226      $self->_data_ev_refcount_dec(
227        @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]
228      );
229      $sent_count--;
230    }
231
232    last SENT unless $sent_count;
233
234    croak "lingering sent count: $sent_count" if $sent_count;
235  }
236
237  croak "lingering event count" if delete $event_count{$sid};
238  croak "lingering post count" if delete $post_count{$sid};
239}
240
241# TODO Alarm maintenance functions may move out to a separate
242# POE::Resource module in the future.  Why?  Because alarms may
243# eventually be managed by something other than the event queue.
244# Especially if we incorporate a proper Session scheduler.  Be sure to
245# move the tests to a corresponding t/res/*.t file.
246
247### Remove a specific alarm by its name.  This is in the events
248### section because alarms are currently implemented as events with
249### future due times.
250
251sub _data_ev_clear_alarm_by_name {
252  my ($self, $sid, $alarm_name) = @_;
253
254  my $my_alarm = sub {
255    return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
256    return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid;
257    return 0 unless $_[0]->[EV_NAME] eq $alarm_name;
258    return 1;
259  };
260
261  foreach ($kr_queue->remove_items($my_alarm)) {
262    $self->_data_ev_refcount_dec(@{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]);
263  }
264}
265
266### Remove a specific alarm by its ID.  This is in the events section
267### because alarms are currently implemented as events with future due
268### times.  TODO It's possible to remove non-alarms; is that wrong?
269
270sub _data_ev_clear_alarm_by_id {
271  my ($self, $sid, $alarm_id) = @_;
272
273  my $my_alarm = sub {
274    $_[0]->[EV_SESSION]->ID() eq $sid;
275  };
276
277  my ($pri, $id, $event) = $kr_queue->remove_item($alarm_id, $my_alarm);
278  return unless defined $pri;
279
280  if (TRACE_EVENTS) {
281    _warn(
282      "<ev> removed event $id ``", $event->[EV_NAME], "'' to ",
283      $self->_data_alias_loggable($sid), " at $pri"
284    );
285  }
286
287  $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] );
288  my $time = $event->[EV_WALLTIME] + ($event->[EV_DELTA]||0);
289  return ($time, $event);
290}
291
292### Remove all the alarms for a session.  Whoot!
293
294sub _data_ev_clear_alarm_by_session {
295  my ($self, $sid) = @_;
296
297  my $my_alarm = sub {
298    return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
299    return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid;
300    return 1;
301  };
302
303  my @removed;
304  foreach ($kr_queue->remove_items($my_alarm)) {
305    my ($pri, $event) = @$_[ITEM_PRIORITY, ITEM_PAYLOAD];
306    $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] );
307    my $time = ($event->[EV_WALLTIME]||0) + ($event->[EV_DELTA]||0);
308    push @removed, [ $event->[EV_NAME], $time, @{$event->[EV_ARGS]} ];
309  }
310
311  return @removed;
312}
313
314### Decrement a post refcount
315
316sub _data_ev_refcount_dec {
317  my ($self, $source_session, $dest_session) = @_;
318
319  my ($source_id, $dest_id) = ($source_session->ID, $dest_session->ID);
320
321  if (ASSERT_DATA) {
322    _trap $dest_session unless exists $event_count{$dest_id};
323  }
324
325  $self->_data_ses_refcount_dec($dest_id) unless --$event_count{$dest_id};
326
327  return if $dest_id eq $source_id;
328
329  if (ASSERT_DATA) {
330    _trap $source_session unless exists $post_count{$source_id};
331  }
332
333  $self->_data_ses_refcount_dec($source_id) unless --$post_count{$source_id};
334}
335
336### Fetch the number of pending events sent to a session.
337
338sub _data_ev_get_count_to {
339  my ($self, $sid) = @_;
340  return $event_count{$sid} || 0;
341}
342
343### Fetch the number of pending events sent from a session.
344
345sub _data_ev_get_count_from {
346  my ($self, $sid) = @_;
347  return $post_count{$sid} || 0;
348}
349
350### Dispatch events that are due for "now" or earlier.
351
352sub _data_ev_dispatch_due {
353  my $self = shift;
354
355  if (TRACE_EVENTS) {
356    foreach ($kr_queue->peek_items(sub { 1 })) {
357      my @event = map { defined() ? $_ : "(undef)" } @{$_->[ITEM_PAYLOAD]};
358      _warn(
359        "<ev> time($_->[ITEM_PRIORITY]) id($_->[ITEM_ID]) ",
360        "event(@event)\n"
361      );
362    }
363  }
364
365  my $now = monotime();
366  my $next_time;
367  while (
368    defined($next_time = $kr_queue->get_next_priority()) and
369    $next_time <= $now
370  ) {
371    my ($priority, $id, $event) = $kr_queue->dequeue_next();
372
373    if (TRACE_EVENTS) {
374      _warn("<ev> dispatching event $id ($event->[EV_NAME])");
375    }
376
377    # TODO - Why can't we reverse these two lines?
378    # TODO - Reversing them could avoid entering and removing GC marks.
379    $self->_data_ev_refcount_dec($event->[EV_SOURCE], $event->[EV_SESSION]);
380
381    if ($event->[EV_TYPE] & (ET_SIGNAL | ET_SIGDIE)) {
382      $self->_dispatch_signal_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id);
383    }
384    else {
385      $self->_dispatch_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id);
386    }
387
388    # Stop the system if an unhandled exception occurred.
389    # This wipes out all sessions and associated resources.
390    next unless $POE::Kernel::kr_exception;
391    POE::Kernel->stop();
392  }
393
394  # Sweep for dead sessions.  The sweep may alter the next queue time.
395
396  $self->_data_ses_gc_sweep();
397  $next_time = $kr_queue->get_next_priority();
398
399  # Tell the event loop to wait for the next event, if there is one.
400  # Otherwise we're going to wait indefinitely for some other event.
401
402  if (defined $next_time) {
403    $self->loop_reset_time_watcher($next_time);
404  }
405  else {
406    $self->loop_pause_time_watcher();
407  }
408}
409
4101;
411
412__END__
413
414=head1 NAME
415
416POE::Resource::Events - internal event manager for POE::Kernel
417
418=head1 SYNOPSIS
419
420There is no public API.
421
422=head1 DESCRIPTION
423
424POE::Resource::Events is a mix-in class for POE::Kernel.  It hides the
425complexity of managing POE's events from even POE itself.  It is used
426internally by POE::Kernel, so it has no public interface.
427
428=head1 SEE ALSO
429
430See L<POE::Kernel/Asynchronous Messages (FIFO Events)> for one public
431events API.
432
433See L<POE::Kernel/Resources> for public information about POE
434resources.
435
436See L<POE::Resource> for general discussion about resources and the
437classes that manage them.
438
439=head1 BUGS
440
441None known.
442
443=head1 AUTHORS & COPYRIGHTS
444
445Please see L<POE> for more information about authors and contributors.
446
447=cut
448
449# rocco // vim: ts=2 sw=2 expandtab
450# TODO - Edit.
451