1# Data and accessors to manage POE's events. 2 3package POE::Resource::Events; 4 5use vars qw($VERSION); 6$VERSION = '1.368'; # NOTE - Should be #.### (three decimal places) 7 8# These methods are folded into POE::Kernel; 9package POE::Kernel; 10 11use strict; 12 13# A local copy of the queue so we can manipulate it directly. 14my $kr_queue; 15 16my %event_count; 17# ( $session_id => $count, 18# ..., 19# ); 20 21my %post_count; 22# ( $session_id => $count, 23# ..., 24# ); 25 26### Begin-run initialization. 27 28sub _data_ev_initialize { 29 my ($self, $queue) = @_; 30 $kr_queue = $queue; 31} 32 33### End-run leak checking. 34 35sub _data_ev_relocate_kernel_id { 36 my ($self, $old_id, $new_id) = @_; 37 38 $event_count{$new_id} = delete $event_count{$old_id} 39 if exists $event_count{$old_id}; 40 $post_count{$new_id} = delete $post_count{$old_id} 41 if exists $post_count{$old_id}; 42} 43 44sub _data_ev_finalize { 45 my $finalized_ok = 1; 46 while (my ($ses_id, $cnt) = each(%event_count)) { 47 $finalized_ok = 0; 48 _warn("!!! Leaked event-to count: $ses_id = $cnt\n"); 49 } 50 51 while (my ($ses_id, $cnt) = each(%post_count)) { 52 $finalized_ok = 0; 53 _warn("!!! Leaked event-from count: $ses_id = $cnt\n"); 54 } 55 return $finalized_ok; 56} 57 58### Enqueue an event. 59 60sub FIFO_TIME_EPSILON () { 0.000001 } 61my $last_fifo_time = monotime(); 62 63sub _data_ev_enqueue { 64 my ( 65 $self, 66 $session, $source_session, $event, $type, $etc, 67 $file, $line, $fromstate, $time, $delta, $priority 68 ) = @_; 69 70 my $sid = $session->ID; 71 72 if (ASSERT_DATA) { 73 unless ($self->_data_ses_exists($sid)) { 74 _trap( 75 "<ev> can't enqueue event ``$event'' for nonexistent", 76 $self->_data_alias_loggable($sid) 77 ); 78 } 79 } 80 81 # This is awkward, but faster than using the fields individually. 82 my $event_to_enqueue = [ @_[(1+EV_SESSION) .. (1+EV_FROMSTATE)] ]; 83 if( defined $time ) { 84 $event_to_enqueue->[EV_WALLTIME] = $time; 85 $event_to_enqueue->[EV_DELTA] = $delta; 86 $priority ||= wall2mono( $time + ($delta||0) ); 87 } 88 else { 89 $priority ||= monotime(); 90 } 91 92 my $new_id; 93 my $old_head_priority = $kr_queue->get_next_priority(); 94 95 unless ($type & ET_MASK_DELAYED) { 96 $priority = $last_fifo_time + FIFO_TIME_EPSILON if $priority <= $last_fifo_time; 97 $last_fifo_time = $priority; 98 } 99 100 $new_id = $kr_queue->enqueue($priority, $event_to_enqueue); 101 $event_to_enqueue->[EV_SEQ] = $new_id; 102 103 #_carp( Carp::longmess( "<ev> priority is much to far in the future" ) ) if $priority > 1354569908; 104 if (TRACE_EVENTS ) { 105 _warn( 106 "<ev> enqueued event $new_id ``$event'' from ", 107 $self->_data_alias_loggable($source_session->ID), " to ", 108 $self->_data_alias_loggable($sid), 109 " at $time, priority=$priority" 110 ); 111 } 112 113 unless (defined $old_head_priority) { 114 $self->loop_resume_time_watcher($priority); 115 } 116 elsif ($priority < $old_head_priority) { 117 $self->loop_reset_time_watcher($priority); 118 } 119 120 # This is the counterpart to _data_ev_refcount_dec(). It's only 121 # used in one place, so it's not in its own function. 122 123 $self->_data_ses_refcount_inc($sid) unless $event_count{$sid}++; 124 125 return $new_id if $sid eq $source_session->ID(); 126 127 $self->_data_ses_refcount_inc($source_session->ID) unless ( 128 $post_count{$source_session->ID}++ 129 ); 130 131 return $new_id; 132} 133 134sub _data_ev_set 135{ 136 my( $self, $alarm_id, $my_alarm, $time, $pri, $delta ) = @_; 137 138 my $event = ( 139 grep { $_->[1] == $alarm_id } 140 $kr_queue->peek_items( $my_alarm ) 141 )[0]; 142 143 return unless $event; 144 145 my $payload = $event->[ITEM_PAYLOAD]; 146 147 # XXX - However, if there has been a clock skew, the priority will 148 # have changed and we should recalculate priority from time+delta 149 150 $delta = $payload->[EV_DELTA] || 0 unless defined $delta; 151 $kr_queue->set_priority( $alarm_id, $my_alarm, $pri+$delta ); 152 $payload->[EV_WALLTIME] = $time; 153 $payload->[EV_DELTA] = $delta; 154 155 return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) ); 156} 157 158sub _data_ev_adjust 159{ 160 my( $self, $alarm_id, $my_alarm, $time, $delta ) = @_; 161 162 # XXX - However, if there has been a clock skew, the priority will 163 # have changed and we should recalculate priority from time+delta 164 if( $time ) { 165 # PG - We are never invoked with $time anyway. 166 $kr_queue->set_priority( $alarm_id, $my_alarm, $time+$delta ); 167 } 168 else { 169 $kr_queue->adjust_priority( $alarm_id, $my_alarm, $delta ); 170 } 171 172 my $event = ( 173 grep { $_->[1] == $alarm_id } 174 $kr_queue->peek_items( $my_alarm ) 175 )[0]; 176 177 return unless $event; 178 179 my $payload = $event->[ITEM_PAYLOAD]; 180 181 $payload->[EV_WALLTIME] = $time if $time; 182 $payload->[EV_DELTA] += $delta if $delta; 183 184 return( ($payload->[EV_WALLTIME] || 0) + ($payload->[EV_DELTA] || 0) ); 185} 186 187### Remove events sent to or from a specific session. 188 189sub _data_ev_clear_session { 190 my ($self, $sid) = @_; 191 192 # Events sent to the session. 193 PENDING: { 194 my $pending_count = $event_count{$sid}; 195 last PENDING unless $pending_count; 196 197 foreach ( 198 $kr_queue->remove_items( 199 sub { $_[0][EV_SESSION]->ID() eq $sid }, 200 $pending_count 201 ) 202 ) { 203 $self->_data_ev_refcount_dec( 204 @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION] 205 ); 206 $pending_count--; 207 } 208 209 # TODO - fork() can make this go negative on some systems. 210 last PENDING unless $pending_count; 211 212 croak "lingering pending count: $pending_count" if $pending_count; 213 } 214 215 # Events sent by the session. 216 SENT: { 217 my $sent_count = $post_count{$sid}; 218 last SENT unless $sent_count; 219 220 foreach ( 221 $kr_queue->remove_items( 222 sub { $_[0][EV_SOURCE]->ID() eq $sid }, 223 $sent_count 224 ) 225 ) { 226 $self->_data_ev_refcount_dec( 227 @{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION] 228 ); 229 $sent_count--; 230 } 231 232 last SENT unless $sent_count; 233 234 croak "lingering sent count: $sent_count" if $sent_count; 235 } 236 237 croak "lingering event count" if delete $event_count{$sid}; 238 croak "lingering post count" if delete $post_count{$sid}; 239} 240 241# TODO Alarm maintenance functions may move out to a separate 242# POE::Resource module in the future. Why? Because alarms may 243# eventually be managed by something other than the event queue. 244# Especially if we incorporate a proper Session scheduler. Be sure to 245# move the tests to a corresponding t/res/*.t file. 246 247### Remove a specific alarm by its name. This is in the events 248### section because alarms are currently implemented as events with 249### future due times. 250 251sub _data_ev_clear_alarm_by_name { 252 my ($self, $sid, $alarm_name) = @_; 253 254 my $my_alarm = sub { 255 return 0 unless $_[0]->[EV_TYPE] & ET_ALARM; 256 return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid; 257 return 0 unless $_[0]->[EV_NAME] eq $alarm_name; 258 return 1; 259 }; 260 261 foreach ($kr_queue->remove_items($my_alarm)) { 262 $self->_data_ev_refcount_dec(@{$_->[ITEM_PAYLOAD]}[EV_SOURCE, EV_SESSION]); 263 } 264} 265 266### Remove a specific alarm by its ID. This is in the events section 267### because alarms are currently implemented as events with future due 268### times. TODO It's possible to remove non-alarms; is that wrong? 269 270sub _data_ev_clear_alarm_by_id { 271 my ($self, $sid, $alarm_id) = @_; 272 273 my $my_alarm = sub { 274 $_[0]->[EV_SESSION]->ID() eq $sid; 275 }; 276 277 my ($pri, $id, $event) = $kr_queue->remove_item($alarm_id, $my_alarm); 278 return unless defined $pri; 279 280 if (TRACE_EVENTS) { 281 _warn( 282 "<ev> removed event $id ``", $event->[EV_NAME], "'' to ", 283 $self->_data_alias_loggable($sid), " at $pri" 284 ); 285 } 286 287 $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] ); 288 my $time = $event->[EV_WALLTIME] + ($event->[EV_DELTA]||0); 289 return ($time, $event); 290} 291 292### Remove all the alarms for a session. Whoot! 293 294sub _data_ev_clear_alarm_by_session { 295 my ($self, $sid) = @_; 296 297 my $my_alarm = sub { 298 return 0 unless $_[0]->[EV_TYPE] & ET_ALARM; 299 return 0 unless $_[0]->[EV_SESSION]->ID() eq $sid; 300 return 1; 301 }; 302 303 my @removed; 304 foreach ($kr_queue->remove_items($my_alarm)) { 305 my ($pri, $event) = @$_[ITEM_PRIORITY, ITEM_PAYLOAD]; 306 $self->_data_ev_refcount_dec( @$event[EV_SOURCE, EV_SESSION] ); 307 my $time = ($event->[EV_WALLTIME]||0) + ($event->[EV_DELTA]||0); 308 push @removed, [ $event->[EV_NAME], $time, @{$event->[EV_ARGS]} ]; 309 } 310 311 return @removed; 312} 313 314### Decrement a post refcount 315 316sub _data_ev_refcount_dec { 317 my ($self, $source_session, $dest_session) = @_; 318 319 my ($source_id, $dest_id) = ($source_session->ID, $dest_session->ID); 320 321 if (ASSERT_DATA) { 322 _trap $dest_session unless exists $event_count{$dest_id}; 323 } 324 325 $self->_data_ses_refcount_dec($dest_id) unless --$event_count{$dest_id}; 326 327 return if $dest_id eq $source_id; 328 329 if (ASSERT_DATA) { 330 _trap $source_session unless exists $post_count{$source_id}; 331 } 332 333 $self->_data_ses_refcount_dec($source_id) unless --$post_count{$source_id}; 334} 335 336### Fetch the number of pending events sent to a session. 337 338sub _data_ev_get_count_to { 339 my ($self, $sid) = @_; 340 return $event_count{$sid} || 0; 341} 342 343### Fetch the number of pending events sent from a session. 344 345sub _data_ev_get_count_from { 346 my ($self, $sid) = @_; 347 return $post_count{$sid} || 0; 348} 349 350### Dispatch events that are due for "now" or earlier. 351 352sub _data_ev_dispatch_due { 353 my $self = shift; 354 355 if (TRACE_EVENTS) { 356 foreach ($kr_queue->peek_items(sub { 1 })) { 357 my @event = map { defined() ? $_ : "(undef)" } @{$_->[ITEM_PAYLOAD]}; 358 _warn( 359 "<ev> time($_->[ITEM_PRIORITY]) id($_->[ITEM_ID]) ", 360 "event(@event)\n" 361 ); 362 } 363 } 364 365 my $now = monotime(); 366 my $next_time; 367 while ( 368 defined($next_time = $kr_queue->get_next_priority()) and 369 $next_time <= $now 370 ) { 371 my ($priority, $id, $event) = $kr_queue->dequeue_next(); 372 373 if (TRACE_EVENTS) { 374 _warn("<ev> dispatching event $id ($event->[EV_NAME])"); 375 } 376 377 # TODO - Why can't we reverse these two lines? 378 # TODO - Reversing them could avoid entering and removing GC marks. 379 $self->_data_ev_refcount_dec($event->[EV_SOURCE], $event->[EV_SESSION]); 380 381 if ($event->[EV_TYPE] & (ET_SIGNAL | ET_SIGDIE)) { 382 $self->_dispatch_signal_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id); 383 } 384 else { 385 $self->_dispatch_event(@{$event}[EV_SESSION..EV_FROMSTATE], $priority, $id); 386 } 387 388 # Stop the system if an unhandled exception occurred. 389 # This wipes out all sessions and associated resources. 390 next unless $POE::Kernel::kr_exception; 391 POE::Kernel->stop(); 392 } 393 394 # Sweep for dead sessions. The sweep may alter the next queue time. 395 396 $self->_data_ses_gc_sweep(); 397 $next_time = $kr_queue->get_next_priority(); 398 399 # Tell the event loop to wait for the next event, if there is one. 400 # Otherwise we're going to wait indefinitely for some other event. 401 402 if (defined $next_time) { 403 $self->loop_reset_time_watcher($next_time); 404 } 405 else { 406 $self->loop_pause_time_watcher(); 407 } 408} 409 4101; 411 412__END__ 413 414=head1 NAME 415 416POE::Resource::Events - internal event manager for POE::Kernel 417 418=head1 SYNOPSIS 419 420There is no public API. 421 422=head1 DESCRIPTION 423 424POE::Resource::Events is a mix-in class for POE::Kernel. It hides the 425complexity of managing POE's events from even POE itself. It is used 426internally by POE::Kernel, so it has no public interface. 427 428=head1 SEE ALSO 429 430See L<POE::Kernel/Asynchronous Messages (FIFO Events)> for one public 431events API. 432 433See L<POE::Kernel/Resources> for public information about POE 434resources. 435 436See L<POE::Resource> for general discussion about resources and the 437classes that manage them. 438 439=head1 BUGS 440 441None known. 442 443=head1 AUTHORS & COPYRIGHTS 444 445Please see L<POE> for more information about authors and contributors. 446 447=cut 448 449# rocco // vim: ts=2 sw=2 expandtab 450# TODO - Edit. 451