1 /**
2  * Copyright (c) 2007-2012, Timothy Stack
3  *
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  * * Redistributions of source code must retain the above copyright notice, this
10  * list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  * * Neither the name of Timothy Stack nor the names of its contributors
15  * may be used to endorse or promote products derived from this software
16  * without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
19  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21  * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
22  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
25  * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  *
29  * @file line_buffer.hh
30  */
31 
32 #ifndef line_buffer_hh
33 #define line_buffer_hh
34 
35 #include <errno.h>
36 #include <sys/types.h>
37 #include <unistd.h>
38 #include <zlib.h>
39 
40 #include <exception>
41 #include <vector>
42 
43 #include "base/lnav_log.hh"
44 #include "base/file_range.hh"
45 #include "base/result.h"
46 #include "auto_fd.hh"
47 #include "auto_mem.hh"
48 #include "shared_buffer.hh"
49 
50 struct line_info {
51     file_range li_file_range;
52     bool li_partial{false};
53     bool li_valid_utf{true};
54 };
55 
56 /**
57  * Buffer for reading whole lines out of file descriptors.  The class presents
58  * a stateless interface, callers specify the offset where a line starts and
59  * the class takes care of caching the surrounding range and locating the
60  * delimiter.
61  *
62  * XXX A bit of a wheel reinvention, but I'm not sure how well the libraries
63  * handle non-blocking I/O...
64  */
65 class line_buffer {
66 public:
67     static const ssize_t DEFAULT_LINE_BUFFER_SIZE   = 256 * 1024;
68     static const ssize_t MAX_LINE_BUFFER_SIZE       = 4 * 4 * DEFAULT_LINE_BUFFER_SIZE;
69     class error
70         : public std::exception {
71 public:
error(int err)72         error(int err)
73             : e_err(err) { };
74 
75         int e_err;
76     };
77 
78 #define GZ_WINSIZE 32768U           /*> gzip's max supported dictionary is 15-bits */
79 #define GZ_RAW_MODE           (-15) /*> Raw inflate data mode */
80 #define GZ_HEADER_MODE    (15 + 32) /*> Automatic zstd or gzip decoding */
81 #define GZ_BORROW_BITS_MASK       7 /*> Bits (0-7) consumed in previous block */
82 #define GZ_END_OF_BLOCK_MASK    128 /*> Stopped because reached end-of-block */
83 #define GZ_END_OF_FILE_MASK      64 /*> Stopped because reached end-of-file */
84 
85     /**
86      * A memoized gzip file reader that can do random file access faster than
87      * gzseek/gzread alone.
88      */
89     class gz_indexed {
90         public:
91         gz_indexed();
92         gz_indexed(gz_indexed &&other) = default;
~gz_indexed()93         ~gz_indexed() {
94             this->close();
95         }
96 
operator bool() const97         inline operator bool() const {
98             return this->gz_fd != -1;
99         }
100 
get_source_offset()101         uLong get_source_offset() {
102             return !!*this ? this->strm.total_in + this->strm.avail_in : 0;
103         }
104 
105         void close();
106         void init_stream();
107         void continue_stream();
108         void open(int fd);
109         int stream_data(void * buf, size_t size);
110         void seek(off_t offset);
111 
112         /**
113          * Decompress bytes from the gz file returning at most `size` bytes.
114          * offset is the byte-offset in the decompressed data stream.
115          */
116         int read(void * buf, size_t offset, size_t size);
117 
118         struct indexDict {
119             off_t in = 0;
120             off_t out = 0;
121             unsigned char bits = 0;
122             unsigned char in_bits = 0;
123             Bytef index[GZ_WINSIZE];
indexDictline_buffer::gz_indexed::indexDict124             indexDict(z_stream const & s, const file_size_t size) {
125                 assert((s.data_type & GZ_END_OF_BLOCK_MASK));
126                 assert(!(s.data_type & GZ_END_OF_FILE_MASK));
127                 assert(size >= s.avail_out + GZ_WINSIZE);
128                 this->bits = s.data_type & GZ_BORROW_BITS_MASK;
129                 this->in = s.total_in;
130                 this->out = s.total_out;
131                 auto last_byte_in = s.next_in[-1];
132                 this->in_bits = last_byte_in >> (8 - this->bits);
133                 // Copy the last 32k uncompressed data (sliding window) to our index
134                 memcpy(this->index, s.next_out - GZ_WINSIZE, GZ_WINSIZE);
135             }
136 
applyline_buffer::gz_indexed::indexDict137             int apply(z_streamp s) {
138                 s->zalloc = Z_NULL;
139                 s->zfree = Z_NULL;
140                 s->opaque = Z_NULL;
141                 s->avail_in = 0;
142                 s->next_in = Z_NULL;
143                 auto ret = inflateInit2(s, GZ_RAW_MODE);
144                 if (ret != Z_OK) {
145                     return ret;
146                 }
147                 if (this->bits) {
148                     inflatePrime(s, this->bits, this->in_bits);
149                 }
150                 s->total_in = this->in;
151                 s->total_out = this->out;
152                 inflateSetDictionary(s, this->index, GZ_WINSIZE);
153                 return ret;
154             }
155         };
156     private:
157         z_stream                strm;               /*< gzip streams structure */
158         std::vector<indexDict>  syncpoints;         /*< indexed dictionaries as discovered */
159         auto_mem<Bytef>         inbuf;              /*< Compressed data buffer */
160         int gz_fd = -1;                             /*< The file to read data from. */
161     };
162 
163     /** Construct an empty line_buffer. */
164     line_buffer();
165 
166     line_buffer(line_buffer &&other) = default;
167 
168     virtual ~line_buffer();
169 
170     /** @param fd The file descriptor that data should be pulled from. */
171     void set_fd(auto_fd &fd);
172 
173     /** @return The file descriptor that data should be pulled from. */
get_fd() const174     int get_fd() const { return this->lb_fd; };
175 
get_file_time() const176     time_t get_file_time() const { return this->lb_file_time; };
177 
178     /**
179      * @return The size of the file or the amount of data pulled from a pipe.
180      */
get_file_size() const181     file_ssize_t get_file_size() const { return this->lb_file_size; };
182 
is_pipe() const183     bool is_pipe() const {
184         return !this->lb_seekable;
185     };
186 
is_pipe_closed() const187     bool is_pipe_closed() const {
188         return !this->lb_seekable && (this->lb_file_size != -1);
189     };
190 
is_compressed() const191     bool is_compressed() const {
192         return this->lb_gz_file || this->lb_bz_file;
193     };
194 
get_read_offset(file_off_t off) const195     file_off_t get_read_offset(file_off_t off) const
196     {
197         if (this->is_compressed()) {
198             return this->lb_compressed_offset;
199         }
200         else{
201             return off;
202         }
203     };
204 
is_data_available(file_off_t off,file_off_t stat_size) const205     bool is_data_available(file_off_t off, file_off_t stat_size) const {
206         if (this->is_compressed()) {
207             return (this->lb_file_size == -1 || off < this->lb_file_size);
208         }
209         return off < stat_size;
210     };
211 
212     /**
213      * Attempt to load the next line into the buffer.
214      *
215      * @param prev_line The range of the previous line.
216      * @return If the read was successful, information about the line.
217      *   Otherwise, an error message.
218      */
219     Result<line_info, std::string> load_next_line(file_range prev_line = {});
220 
221     Result<shared_buffer_ref, std::string> read_range(file_range fr);
222 
223     file_range get_available();
224 
clear()225     void clear()
226     {
227         this->lb_buffer_size  = 0;
228     };
229 
230     /** Release any resources held by this object. */
reset()231     void reset()
232     {
233         this->lb_fd.reset();
234 
235         this->lb_file_offset      = 0;
236         this->lb_file_size        = (ssize_t)-1;
237         this->lb_buffer_size      = 0;
238         this->lb_last_line_offset = -1;
239     };
240 
241     /** Check the invariants for this object. */
invariant()242     bool invariant()
243     {
244         require(this->lb_buffer != nullptr);
245         require(this->lb_buffer_size <= this->lb_buffer_max);
246 
247         return true;
248     };
249 
250 private:
251 
252     /**
253      * @param off The file offset to check for in the buffer.
254      * @return True if the given offset is cached in the buffer.
255      */
in_range(file_off_t off) const256     bool in_range(file_off_t off) const
257     {
258         return this->lb_file_offset <= off &&
259                off < (this->lb_file_offset + this->lb_buffer_size);
260     };
261 
262     void resize_buffer(size_t new_max);
263 
264     /**
265      * Ensure there is enough room in the buffer to cache a range of data from
266      * the file.  First, this method will check to see if there is enough room
267      * from where 'start' begins in the buffer to the maximum buffer size.  If
268      * this is not enough, the currently cached data at 'start' will be moved
269      * to the beginning of the buffer, overwriting any cached data earlier in
270      * the file.  Finally, if this is still not enough, the buffer will be
271      * reallocated to make more room.
272      *
273      * @param start The file offset of the start of the line.
274      * @param max_length The amount of data to be cached in the buffer.
275      */
276     void ensure_available(file_off_t start, ssize_t max_length);
277 
278     /**
279      * Fill the buffer with the given range of data from the file.
280      *
281      * @param start The file offset where data should start to be read from the
282      * file.
283      * @param max_length The maximum amount of data to read from the file.
284      * @return True if any data was read from the file.
285      */
286     bool fill_range(file_off_t start, ssize_t max_length);
287 
288     /**
289      * After a successful fill, the cached data can be retrieved with this
290      * method.
291      *
292      * @param start The file offset to retrieve cached data for.
293      * @param avail_out On return, the amount of data currently cached at the
294      * given offset.
295      * @return A pointer to the start of the cached data in the internal
296      * buffer.
297      */
get_range(file_off_t start,file_ssize_t & avail_out) const298     char *get_range(file_off_t start, file_ssize_t &avail_out) const
299     {
300         auto buffer_offset = start - this->lb_file_offset;
301         char *retval;
302 
303         require(buffer_offset >= 0);
304         require(this->lb_buffer_size >= buffer_offset);
305 
306         retval    = &this->lb_buffer[buffer_offset];
307         avail_out = this->lb_buffer_size - buffer_offset;
308 
309         return retval;
310     };
311 
312     shared_buffer lb_share_manager;
313 
314     auto_fd lb_fd;              /*< The file to read data from. */
315     gz_indexed  lb_gz_file;     /*< File reader for gzipped files. */
316     bool    lb_bz_file;         /*< Flag set for bzip2 compressed files. */
317     file_off_t   lb_compressed_offset; /*< The offset into the compressed file. */
318 
319     auto_mem<char> lb_buffer;   /*< The internal buffer where data is cached */
320 
321     file_ssize_t lb_file_size;  /*<
322                                  * The size of the file.  When lb_fd refers to
323                                  * a pipe, this is set to the amount of data
324                                  * read from the pipe when EOF is reached.
325                                  */
326     file_off_t lb_file_offset;  /*<
327                                  * Data cached in the buffer comes from this
328                                  * offset in the file.
329                                  */
330     time_t lb_file_time;
331     ssize_t lb_buffer_size;     /*< The amount of cached data in the buffer. */
332     ssize_t lb_buffer_max;      /*< The amount of allocated memory for the
333                                  *  buffer. */
334     bool   lb_seekable;         /*< Flag set for seekable file descriptors. */
335     file_off_t  lb_last_line_offset; /*< */
336 };
337 #endif
338