1#  You may distribute under the terms of either the GNU General Public License
2#  or the Artistic License (the same terms as Perl itself)
3#
4#  (C) Paul Evans, 2011-2015 -- leonerd@leonerd.org.uk
5
6package IO::Async::FileStream;
7
8use strict;
9use warnings;
10
11our $VERSION = '0.800';
12
13use base qw( IO::Async::Stream );
14
15use IO::Async::File;
16
17use Carp;
18use Fcntl qw( SEEK_SET SEEK_CUR );
19
20=head1 NAME
21
22C<IO::Async::FileStream> - read the tail of a file
23
24=head1 SYNOPSIS
25
26   use IO::Async::FileStream;
27
28   use IO::Async::Loop;
29   my $loop = IO::Async::Loop->new;
30
31   open my $logh, "<", "var/logs/daemon.log" or
32      die "Cannot open logfile - $!";
33
34   my $filestream = IO::Async::FileStream->new(
35      read_handle => $logh,
36
37      on_initial => sub {
38         my ( $self ) = @_;
39         $self->seek_to_last( "\n" );
40      },
41
42      on_read => sub {
43         my ( $self, $buffref ) = @_;
44
45         while( $$buffref =~ s/^(.*\n)// ) {
46            print "Received a line $1";
47         }
48
49         return 0;
50      },
51   );
52
53   $loop->add( $filestream );
54
55   $loop->run;
56
57=head1 DESCRIPTION
58
59This subclass of L<IO::Async::Stream> allows reading the end of a regular file
60which is being appended to by some other process. It invokes the C<on_read>
61event when more data has been added to the file.
62
63This class provides an API identical to L<IO::Async::Stream> when given a
64C<read_handle>; it should be treated similarly. In particular, it can be given
65an C<on_read> handler, or subclassed to provide an C<on_read> method, or even
66used as the C<transport> for an L<IO::Async::Protocol::Stream> object.
67
68It will not support writing.
69
70To watch a file, directory, or other filesystem entity for updates of other
71properties, such as C<mtime>, see also L<IO::Async::File>.
72
73=cut
74
75=head1 EVENTS
76
77The following events are invoked, either using subclass methods or CODE
78references in parameters.
79
80Because this is a subclass of L<IO::Async::Stream> in read-only mode, all the
81events supported by C<Stream> relating to the read handle are supported here.
82This is not a full list; see also the documentation relating to
83L<IO::Async::Stream>.
84
85=head2 $ret = on_read \$buffer, $eof
86
87Invoked when more data is available in the internal receiving buffer.
88
89Note that C<$eof> only indicates that all the data currently available in the
90file has now been read; in contrast to a regular L<IO::Async::Stream>, this
91object will not stop watching after this condition. Instead, it will continue
92watching the file for updates.
93
94=head2 on_truncated
95
96Invoked when the file size shrinks. If this happens, it is presumed that the
97file content has been replaced. Reading will then commence from the start of
98the file.
99
100=head2 on_initial $size
101
102Invoked the first time the file is looked at. It is passed the initial size of
103the file. The code implementing this method can use the C<seek> or
104C<seek_to_last> methods to set the initial read position in the file to skip
105over some initial content.
106
107This method may be useful to skip initial content in the file, if the object
108should only respond to new content added after it was created.
109
110=cut
111
112sub _init
113{
114   my $self = shift;
115   my ( $params ) = @_;
116
117   $self->SUPER::_init( $params );
118
119   $params->{close_on_read_eof} = 0;
120
121   $self->{last_size} = undef;
122
123   $self->add_child( $self->{file} = IO::Async::File->new(
124      on_devino_changed => $self->_replace_weakself( 'on_devino_changed' ),
125      on_size_changed   => $self->_replace_weakself( 'on_size_changed' ),
126   ) );
127}
128
129=head1 PARAMETERS
130
131The following named parameters may be passed to C<new> or C<configure>, in
132addition to the parameters relating to reading supported by
133L<IO::Async::Stream>.
134
135=head2 filename => STRING
136
137Optional. If supplied, watches the named file rather than the filehandle given
138in C<read_handle>. The file will be opened by the constructor, and then
139watched for renames. If the file is renamed, the new filename is opened and
140tracked similarly after closing the previous file.
141
142=head2 interval => NUM
143
144Optional. The interval in seconds to poll the filehandle using C<stat(2)>
145looking for size changes. A default of 2 seconds will be applied if not
146defined.
147
148=cut
149
150sub configure
151{
152   my $self = shift;
153   my %params = @_;
154
155   foreach (qw( on_truncated on_initial )) {
156      $self->{$_} = delete $params{$_} if exists $params{$_};
157   }
158
159   foreach (qw( interval )) {
160      $self->{file}->configure( $_ => delete $params{$_} ) if exists $params{$_};
161   }
162   if( exists $params{filename} ) {
163      $self->{file}->configure( filename => delete $params{filename} );
164      $params{read_handle} = $self->{file}->handle;
165   }
166   elsif( exists $params{handle} or exists $params{read_handle} ) {
167      my $handle = delete $params{handle};
168      defined $handle or $handle = delete $params{read_handle};
169
170      $self->{file}->configure( handle => $handle );
171      $params{read_handle} = $self->{file}->handle;
172   }
173
174   croak "Cannot have a write_handle in a ".ref($self) if defined $params{write_handle};
175
176   $self->SUPER::configure( %params );
177
178   if( $self->read_handle and !defined $self->{last_size} ) {
179      my $size = (stat $self->read_handle)[7];
180
181      $self->{last_size} = $size;
182
183      local $self->{running_initial} = 1;
184      $self->maybe_invoke_event( on_initial => $size );
185   }
186}
187
188=head1 METHODS
189
190=cut
191
192# Replace IO::Async::Handle's implementation
193sub _watch_read
194{
195   my $self = shift;
196   my ( $want ) = @_;
197
198   if( $want ) {
199      $self->{file}->start if !$self->{file}->is_running;
200   }
201   else {
202      $self->{file}->stop;
203   }
204}
205
206sub _watch_write
207{
208   my $self = shift;
209   my ( $want ) = @_;
210
211   croak "Cannot _watch_write in " . ref($self) if $want;
212}
213
214sub on_devino_changed
215{
216   my $self = shift or return;
217
218   $self->{renamed} = 1;
219   $self->debug_printf( "read tail of old file" );
220   $self->read_more;
221}
222
223sub on_size_changed
224{
225   my $self = shift or return;
226   my ( $size ) = @_;
227
228   if( $size < $self->{last_size} ) {
229      $self->maybe_invoke_event( on_truncated => );
230      $self->{last_pos} = 0;
231   }
232
233   $self->{last_size} = $size;
234
235   $self->debug_printf( "read_more" );
236   $self->read_more;
237}
238
239sub read_more
240{
241   my $self = shift;
242
243   sysseek( $self->read_handle, $self->{last_pos}, SEEK_SET ) if defined $self->{last_pos};
244
245   $self->on_read_ready;
246
247   $self->{last_pos} = sysseek( $self->read_handle, 0, SEEK_CUR ); # == systell
248
249   if( $self->{last_pos} < $self->{last_size} ) {
250      $self->loop->later( sub { $self->read_more } );
251   }
252   elsif( $self->{renamed} ) {
253      $self->debug_printf( "reopening for rename" );
254
255      $self->{last_size} = 0;
256
257      if( $self->{last_pos} ) {
258         $self->maybe_invoke_event( on_truncated => );
259         $self->{last_pos} = 0;
260         $self->loop->later( sub { $self->read_more } );
261      }
262
263      $self->configure( read_handle => $self->{file}->handle );
264      undef $self->{renamed};
265   }
266}
267
268sub write
269{
270   carp "Cannot ->write from a ".ref($_[0]);
271}
272
273=head2 seek
274
275   $filestream->seek( $offset, $whence )
276
277Callable only during the C<on_initial> event. Moves the read position in the
278filehandle to the given offset. C<$whence> is interpreted as for C<sysseek>,
279should be either C<SEEK_SET>, C<SEEK_CUR> or C<SEEK_END>. Will be set to
280C<SEEK_SET> if not provided.
281
282Normally this would be used to seek to the end of the file, for example
283
284   on_initial => sub {
285      my ( $self, $filesize ) = @_;
286      $self->seek( $filesize );
287   }
288
289=cut
290
291sub seek
292{
293   my $self = shift;
294   my ( $offset, $whence ) = @_;
295
296   $self->{running_initial} or croak "Cannot ->seek except during on_initial";
297
298   defined $whence or $whence = SEEK_SET;
299
300   sysseek( $self->read_handle, $offset, $whence );
301}
302
303=head2 seek_to_last
304
305   $success = $filestream->seek_to_last( $str_pattern, %opts )
306
307Callable only during the C<on_initial> event. Attempts to move the read
308position in the filehandle to just after the last occurrence of a given match.
309C<$str_pattern> may be a literal string or regexp pattern.
310
311Returns a true value if the seek was successful, or false if not. Takes the
312following named arguments:
313
314=over 8
315
316=item blocksize => INT
317
318Optional. Read the file in blocks of this size. Will take a default of 8KiB if
319not defined.
320
321=item horizon => INT
322
323Optional. Give up looking for a match after this number of bytes. Will take a
324default value of 4 times the blocksize if not defined.
325
326To force it to always search through the entire file contents, set this
327explicitly to C<0>.
328
329=back
330
331Because regular file reading happens synchronously, this entire method
332operates entirely synchronously. If the file is very large, it may take a
333while to read back through the entire contents. While this is happening no
334other events can be invoked in the process.
335
336When looking for a string or regexp match, this method appends the
337previously-read buffer to each block read from the file, in case a match
338becomes split across two reads. If C<blocksize> is reduced to a very small
339value, take care to ensure it isn't so small that a match may not be noticed.
340
341This is most likely useful for seeking after the last complete line in a
342line-based log file, to commence reading from the end, while still managing to
343capture any partial content that isn't yet a complete line.
344
345   on_initial => sub {
346      my $self = shift;
347      $self->seek_to_last( "\n" );
348   }
349
350=cut
351
352sub seek_to_last
353{
354   my $self = shift;
355   my ( $str_pattern, %opts ) = @_;
356
357   $self->{running_initial} or croak "Cannot ->seek_to_last except during on_initial";
358
359   my $offset = $self->{last_size};
360
361   my $blocksize = $opts{blocksize} || 8192;
362
363   defined $opts{horizon} or $opts{horizon} = $blocksize * 4;
364   my $horizon = $opts{horizon} ? $offset - $opts{horizon} : 0;
365   $horizon = 0 if $horizon < 0;
366
367   my $re = ref $str_pattern ? $str_pattern : qr/\Q$str_pattern\E/;
368
369   my $prev = "";
370   while( $offset > $horizon ) {
371      my $len = $blocksize;
372      $len = $offset if $len > $offset;
373      $offset -= $len;
374
375      sysseek( $self->read_handle, $offset, SEEK_SET );
376      sysread( $self->read_handle, my $buffer, $blocksize );
377
378      # TODO: If $str_pattern is a plain string this could be more efficient
379      # using rindex
380      if( () = ( $buffer . $prev ) =~ m/$re/sg ) {
381         # $+[0] will be end of last match
382         my $pos = $offset + $+[0];
383         $self->seek( $pos );
384         return 1;
385      }
386
387      $prev = $buffer;
388   }
389
390   $self->seek( $horizon );
391   return 0;
392}
393
394=head1 TODO
395
396=over 4
397
398=item *
399
400Move the actual file update watching code into L<IO::Async::Loop>, possibly as
401a new watch/unwatch method pair C<watch_file>.
402
403=item *
404
405Consider if a construction-time parameter of C<seek_to_end> or C<seek_to_last>
406might be neater than a small code block in C<on_initial>, if that turns out to
407be the only or most common form of use.
408
409=back
410
411=cut
412
413=head1 AUTHOR
414
415Paul Evans <leonerd@leonerd.org.uk>
416
417=cut
418
4190x55AA;
420