1# You may distribute under the terms of either the GNU General Public License 2# or the Artistic License (the same terms as Perl itself) 3# 4# (C) Paul Evans, 2011-2015 -- leonerd@leonerd.org.uk 5 6package IO::Async::FileStream; 7 8use strict; 9use warnings; 10 11our $VERSION = '0.800'; 12 13use base qw( IO::Async::Stream ); 14 15use IO::Async::File; 16 17use Carp; 18use Fcntl qw( SEEK_SET SEEK_CUR ); 19 20=head1 NAME 21 22C<IO::Async::FileStream> - read the tail of a file 23 24=head1 SYNOPSIS 25 26 use IO::Async::FileStream; 27 28 use IO::Async::Loop; 29 my $loop = IO::Async::Loop->new; 30 31 open my $logh, "<", "var/logs/daemon.log" or 32 die "Cannot open logfile - $!"; 33 34 my $filestream = IO::Async::FileStream->new( 35 read_handle => $logh, 36 37 on_initial => sub { 38 my ( $self ) = @_; 39 $self->seek_to_last( "\n" ); 40 }, 41 42 on_read => sub { 43 my ( $self, $buffref ) = @_; 44 45 while( $$buffref =~ s/^(.*\n)// ) { 46 print "Received a line $1"; 47 } 48 49 return 0; 50 }, 51 ); 52 53 $loop->add( $filestream ); 54 55 $loop->run; 56 57=head1 DESCRIPTION 58 59This subclass of L<IO::Async::Stream> allows reading the end of a regular file 60which is being appended to by some other process. It invokes the C<on_read> 61event when more data has been added to the file. 62 63This class provides an API identical to L<IO::Async::Stream> when given a 64C<read_handle>; it should be treated similarly. In particular, it can be given 65an C<on_read> handler, or subclassed to provide an C<on_read> method, or even 66used as the C<transport> for an L<IO::Async::Protocol::Stream> object. 67 68It will not support writing. 69 70To watch a file, directory, or other filesystem entity for updates of other 71properties, such as C<mtime>, see also L<IO::Async::File>. 72 73=cut 74 75=head1 EVENTS 76 77The following events are invoked, either using subclass methods or CODE 78references in parameters. 79 80Because this is a subclass of L<IO::Async::Stream> in read-only mode, all the 81events supported by C<Stream> relating to the read handle are supported here. 82This is not a full list; see also the documentation relating to 83L<IO::Async::Stream>. 84 85=head2 $ret = on_read \$buffer, $eof 86 87Invoked when more data is available in the internal receiving buffer. 88 89Note that C<$eof> only indicates that all the data currently available in the 90file has now been read; in contrast to a regular L<IO::Async::Stream>, this 91object will not stop watching after this condition. Instead, it will continue 92watching the file for updates. 93 94=head2 on_truncated 95 96Invoked when the file size shrinks. If this happens, it is presumed that the 97file content has been replaced. Reading will then commence from the start of 98the file. 99 100=head2 on_initial $size 101 102Invoked the first time the file is looked at. It is passed the initial size of 103the file. The code implementing this method can use the C<seek> or 104C<seek_to_last> methods to set the initial read position in the file to skip 105over some initial content. 106 107This method may be useful to skip initial content in the file, if the object 108should only respond to new content added after it was created. 109 110=cut 111 112sub _init 113{ 114 my $self = shift; 115 my ( $params ) = @_; 116 117 $self->SUPER::_init( $params ); 118 119 $params->{close_on_read_eof} = 0; 120 121 $self->{last_size} = undef; 122 123 $self->add_child( $self->{file} = IO::Async::File->new( 124 on_devino_changed => $self->_replace_weakself( 'on_devino_changed' ), 125 on_size_changed => $self->_replace_weakself( 'on_size_changed' ), 126 ) ); 127} 128 129=head1 PARAMETERS 130 131The following named parameters may be passed to C<new> or C<configure>, in 132addition to the parameters relating to reading supported by 133L<IO::Async::Stream>. 134 135=head2 filename => STRING 136 137Optional. If supplied, watches the named file rather than the filehandle given 138in C<read_handle>. The file will be opened by the constructor, and then 139watched for renames. If the file is renamed, the new filename is opened and 140tracked similarly after closing the previous file. 141 142=head2 interval => NUM 143 144Optional. The interval in seconds to poll the filehandle using C<stat(2)> 145looking for size changes. A default of 2 seconds will be applied if not 146defined. 147 148=cut 149 150sub configure 151{ 152 my $self = shift; 153 my %params = @_; 154 155 foreach (qw( on_truncated on_initial )) { 156 $self->{$_} = delete $params{$_} if exists $params{$_}; 157 } 158 159 foreach (qw( interval )) { 160 $self->{file}->configure( $_ => delete $params{$_} ) if exists $params{$_}; 161 } 162 if( exists $params{filename} ) { 163 $self->{file}->configure( filename => delete $params{filename} ); 164 $params{read_handle} = $self->{file}->handle; 165 } 166 elsif( exists $params{handle} or exists $params{read_handle} ) { 167 my $handle = delete $params{handle}; 168 defined $handle or $handle = delete $params{read_handle}; 169 170 $self->{file}->configure( handle => $handle ); 171 $params{read_handle} = $self->{file}->handle; 172 } 173 174 croak "Cannot have a write_handle in a ".ref($self) if defined $params{write_handle}; 175 176 $self->SUPER::configure( %params ); 177 178 if( $self->read_handle and !defined $self->{last_size} ) { 179 my $size = (stat $self->read_handle)[7]; 180 181 $self->{last_size} = $size; 182 183 local $self->{running_initial} = 1; 184 $self->maybe_invoke_event( on_initial => $size ); 185 } 186} 187 188=head1 METHODS 189 190=cut 191 192# Replace IO::Async::Handle's implementation 193sub _watch_read 194{ 195 my $self = shift; 196 my ( $want ) = @_; 197 198 if( $want ) { 199 $self->{file}->start if !$self->{file}->is_running; 200 } 201 else { 202 $self->{file}->stop; 203 } 204} 205 206sub _watch_write 207{ 208 my $self = shift; 209 my ( $want ) = @_; 210 211 croak "Cannot _watch_write in " . ref($self) if $want; 212} 213 214sub on_devino_changed 215{ 216 my $self = shift or return; 217 218 $self->{renamed} = 1; 219 $self->debug_printf( "read tail of old file" ); 220 $self->read_more; 221} 222 223sub on_size_changed 224{ 225 my $self = shift or return; 226 my ( $size ) = @_; 227 228 if( $size < $self->{last_size} ) { 229 $self->maybe_invoke_event( on_truncated => ); 230 $self->{last_pos} = 0; 231 } 232 233 $self->{last_size} = $size; 234 235 $self->debug_printf( "read_more" ); 236 $self->read_more; 237} 238 239sub read_more 240{ 241 my $self = shift; 242 243 sysseek( $self->read_handle, $self->{last_pos}, SEEK_SET ) if defined $self->{last_pos}; 244 245 $self->on_read_ready; 246 247 $self->{last_pos} = sysseek( $self->read_handle, 0, SEEK_CUR ); # == systell 248 249 if( $self->{last_pos} < $self->{last_size} ) { 250 $self->loop->later( sub { $self->read_more } ); 251 } 252 elsif( $self->{renamed} ) { 253 $self->debug_printf( "reopening for rename" ); 254 255 $self->{last_size} = 0; 256 257 if( $self->{last_pos} ) { 258 $self->maybe_invoke_event( on_truncated => ); 259 $self->{last_pos} = 0; 260 $self->loop->later( sub { $self->read_more } ); 261 } 262 263 $self->configure( read_handle => $self->{file}->handle ); 264 undef $self->{renamed}; 265 } 266} 267 268sub write 269{ 270 carp "Cannot ->write from a ".ref($_[0]); 271} 272 273=head2 seek 274 275 $filestream->seek( $offset, $whence ) 276 277Callable only during the C<on_initial> event. Moves the read position in the 278filehandle to the given offset. C<$whence> is interpreted as for C<sysseek>, 279should be either C<SEEK_SET>, C<SEEK_CUR> or C<SEEK_END>. Will be set to 280C<SEEK_SET> if not provided. 281 282Normally this would be used to seek to the end of the file, for example 283 284 on_initial => sub { 285 my ( $self, $filesize ) = @_; 286 $self->seek( $filesize ); 287 } 288 289=cut 290 291sub seek 292{ 293 my $self = shift; 294 my ( $offset, $whence ) = @_; 295 296 $self->{running_initial} or croak "Cannot ->seek except during on_initial"; 297 298 defined $whence or $whence = SEEK_SET; 299 300 sysseek( $self->read_handle, $offset, $whence ); 301} 302 303=head2 seek_to_last 304 305 $success = $filestream->seek_to_last( $str_pattern, %opts ) 306 307Callable only during the C<on_initial> event. Attempts to move the read 308position in the filehandle to just after the last occurrence of a given match. 309C<$str_pattern> may be a literal string or regexp pattern. 310 311Returns a true value if the seek was successful, or false if not. Takes the 312following named arguments: 313 314=over 8 315 316=item blocksize => INT 317 318Optional. Read the file in blocks of this size. Will take a default of 8KiB if 319not defined. 320 321=item horizon => INT 322 323Optional. Give up looking for a match after this number of bytes. Will take a 324default value of 4 times the blocksize if not defined. 325 326To force it to always search through the entire file contents, set this 327explicitly to C<0>. 328 329=back 330 331Because regular file reading happens synchronously, this entire method 332operates entirely synchronously. If the file is very large, it may take a 333while to read back through the entire contents. While this is happening no 334other events can be invoked in the process. 335 336When looking for a string or regexp match, this method appends the 337previously-read buffer to each block read from the file, in case a match 338becomes split across two reads. If C<blocksize> is reduced to a very small 339value, take care to ensure it isn't so small that a match may not be noticed. 340 341This is most likely useful for seeking after the last complete line in a 342line-based log file, to commence reading from the end, while still managing to 343capture any partial content that isn't yet a complete line. 344 345 on_initial => sub { 346 my $self = shift; 347 $self->seek_to_last( "\n" ); 348 } 349 350=cut 351 352sub seek_to_last 353{ 354 my $self = shift; 355 my ( $str_pattern, %opts ) = @_; 356 357 $self->{running_initial} or croak "Cannot ->seek_to_last except during on_initial"; 358 359 my $offset = $self->{last_size}; 360 361 my $blocksize = $opts{blocksize} || 8192; 362 363 defined $opts{horizon} or $opts{horizon} = $blocksize * 4; 364 my $horizon = $opts{horizon} ? $offset - $opts{horizon} : 0; 365 $horizon = 0 if $horizon < 0; 366 367 my $re = ref $str_pattern ? $str_pattern : qr/\Q$str_pattern\E/; 368 369 my $prev = ""; 370 while( $offset > $horizon ) { 371 my $len = $blocksize; 372 $len = $offset if $len > $offset; 373 $offset -= $len; 374 375 sysseek( $self->read_handle, $offset, SEEK_SET ); 376 sysread( $self->read_handle, my $buffer, $blocksize ); 377 378 # TODO: If $str_pattern is a plain string this could be more efficient 379 # using rindex 380 if( () = ( $buffer . $prev ) =~ m/$re/sg ) { 381 # $+[0] will be end of last match 382 my $pos = $offset + $+[0]; 383 $self->seek( $pos ); 384 return 1; 385 } 386 387 $prev = $buffer; 388 } 389 390 $self->seek( $horizon ); 391 return 0; 392} 393 394=head1 TODO 395 396=over 4 397 398=item * 399 400Move the actual file update watching code into L<IO::Async::Loop>, possibly as 401a new watch/unwatch method pair C<watch_file>. 402 403=item * 404 405Consider if a construction-time parameter of C<seek_to_end> or C<seek_to_last> 406might be neater than a small code block in C<on_initial>, if that turns out to 407be the only or most common form of use. 408 409=back 410 411=cut 412 413=head1 AUTHOR 414 415Paul Evans <leonerd@leonerd.org.uk> 416 417=cut 418 4190x55AA; 420