1 /** 2 * Copyright (c) 2007-2012, Timothy Stack 3 * 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * * Redistributions of source code must retain the above copyright notice, this 10 * list of conditions and the following disclaimer. 11 * * Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation 13 * and/or other materials provided with the distribution. 14 * * Neither the name of Timothy Stack nor the names of its contributors 15 * may be used to endorse or promote products derived from this software 16 * without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY 19 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 20 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 21 * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY 22 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 23 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 24 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 25 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 27 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 * 29 * @file line_buffer.hh 30 */ 31 32 #ifndef line_buffer_hh 33 #define line_buffer_hh 34 35 #include <errno.h> 36 #include <sys/types.h> 37 #include <unistd.h> 38 #include <zlib.h> 39 40 #include <exception> 41 #include <vector> 42 43 #include "base/lnav_log.hh" 44 #include "base/file_range.hh" 45 #include "base/result.h" 46 #include "auto_fd.hh" 47 #include "auto_mem.hh" 48 #include "shared_buffer.hh" 49 50 struct line_info { 51 file_range li_file_range; 52 bool li_partial{false}; 53 bool li_valid_utf{true}; 54 }; 55 56 /** 57 * Buffer for reading whole lines out of file descriptors. The class presents 58 * a stateless interface, callers specify the offset where a line starts and 59 * the class takes care of caching the surrounding range and locating the 60 * delimiter. 61 * 62 * XXX A bit of a wheel reinvention, but I'm not sure how well the libraries 63 * handle non-blocking I/O... 64 */ 65 class line_buffer { 66 public: 67 static const ssize_t DEFAULT_LINE_BUFFER_SIZE = 256 * 1024; 68 static const ssize_t MAX_LINE_BUFFER_SIZE = 4 * 4 * DEFAULT_LINE_BUFFER_SIZE; 69 class error 70 : public std::exception { 71 public: error(int err)72 error(int err) 73 : e_err(err) { }; 74 75 int e_err; 76 }; 77 78 #define GZ_WINSIZE 32768U /*> gzip's max supported dictionary is 15-bits */ 79 #define GZ_RAW_MODE (-15) /*> Raw inflate data mode */ 80 #define GZ_HEADER_MODE (15 + 32) /*> Automatic zstd or gzip decoding */ 81 #define GZ_BORROW_BITS_MASK 7 /*> Bits (0-7) consumed in previous block */ 82 #define GZ_END_OF_BLOCK_MASK 128 /*> Stopped because reached end-of-block */ 83 #define GZ_END_OF_FILE_MASK 64 /*> Stopped because reached end-of-file */ 84 85 /** 86 * A memoized gzip file reader that can do random file access faster than 87 * gzseek/gzread alone. 88 */ 89 class gz_indexed { 90 public: 91 gz_indexed(); 92 gz_indexed(gz_indexed &&other) = default; ~gz_indexed()93 ~gz_indexed() { 94 this->close(); 95 } 96 operator bool() const97 inline operator bool() const { 98 return this->gz_fd != -1; 99 } 100 get_source_offset()101 uLong get_source_offset() { 102 return !!*this ? this->strm.total_in + this->strm.avail_in : 0; 103 } 104 105 void close(); 106 void init_stream(); 107 void continue_stream(); 108 void open(int fd); 109 int stream_data(void * buf, size_t size); 110 void seek(off_t offset); 111 112 /** 113 * Decompress bytes from the gz file returning at most `size` bytes. 114 * offset is the byte-offset in the decompressed data stream. 115 */ 116 int read(void * buf, size_t offset, size_t size); 117 118 struct indexDict { 119 off_t in = 0; 120 off_t out = 0; 121 unsigned char bits = 0; 122 unsigned char in_bits = 0; 123 Bytef index[GZ_WINSIZE]; indexDictline_buffer::gz_indexed::indexDict124 indexDict(z_stream const & s, const file_size_t size) { 125 assert((s.data_type & GZ_END_OF_BLOCK_MASK)); 126 assert(!(s.data_type & GZ_END_OF_FILE_MASK)); 127 assert(size >= s.avail_out + GZ_WINSIZE); 128 this->bits = s.data_type & GZ_BORROW_BITS_MASK; 129 this->in = s.total_in; 130 this->out = s.total_out; 131 auto last_byte_in = s.next_in[-1]; 132 this->in_bits = last_byte_in >> (8 - this->bits); 133 // Copy the last 32k uncompressed data (sliding window) to our index 134 memcpy(this->index, s.next_out - GZ_WINSIZE, GZ_WINSIZE); 135 } 136 applyline_buffer::gz_indexed::indexDict137 int apply(z_streamp s) { 138 s->zalloc = Z_NULL; 139 s->zfree = Z_NULL; 140 s->opaque = Z_NULL; 141 s->avail_in = 0; 142 s->next_in = Z_NULL; 143 auto ret = inflateInit2(s, GZ_RAW_MODE); 144 if (ret != Z_OK) { 145 return ret; 146 } 147 if (this->bits) { 148 inflatePrime(s, this->bits, this->in_bits); 149 } 150 s->total_in = this->in; 151 s->total_out = this->out; 152 inflateSetDictionary(s, this->index, GZ_WINSIZE); 153 return ret; 154 } 155 }; 156 private: 157 z_stream strm; /*< gzip streams structure */ 158 std::vector<indexDict> syncpoints; /*< indexed dictionaries as discovered */ 159 auto_mem<Bytef> inbuf; /*< Compressed data buffer */ 160 int gz_fd = -1; /*< The file to read data from. */ 161 }; 162 163 /** Construct an empty line_buffer. */ 164 line_buffer(); 165 166 line_buffer(line_buffer &&other) = default; 167 168 virtual ~line_buffer(); 169 170 /** @param fd The file descriptor that data should be pulled from. */ 171 void set_fd(auto_fd &fd); 172 173 /** @return The file descriptor that data should be pulled from. */ get_fd() const174 int get_fd() const { return this->lb_fd; }; 175 get_file_time() const176 time_t get_file_time() const { return this->lb_file_time; }; 177 178 /** 179 * @return The size of the file or the amount of data pulled from a pipe. 180 */ get_file_size() const181 file_ssize_t get_file_size() const { return this->lb_file_size; }; 182 is_pipe() const183 bool is_pipe() const { 184 return !this->lb_seekable; 185 }; 186 is_pipe_closed() const187 bool is_pipe_closed() const { 188 return !this->lb_seekable && (this->lb_file_size != -1); 189 }; 190 is_compressed() const191 bool is_compressed() const { 192 return this->lb_gz_file || this->lb_bz_file; 193 }; 194 get_read_offset(file_off_t off) const195 file_off_t get_read_offset(file_off_t off) const 196 { 197 if (this->is_compressed()) { 198 return this->lb_compressed_offset; 199 } 200 else{ 201 return off; 202 } 203 }; 204 is_data_available(file_off_t off,file_off_t stat_size) const205 bool is_data_available(file_off_t off, file_off_t stat_size) const { 206 if (this->is_compressed()) { 207 return (this->lb_file_size == -1 || off < this->lb_file_size); 208 } 209 return off < stat_size; 210 }; 211 212 /** 213 * Attempt to load the next line into the buffer. 214 * 215 * @param prev_line The range of the previous line. 216 * @return If the read was successful, information about the line. 217 * Otherwise, an error message. 218 */ 219 Result<line_info, std::string> load_next_line(file_range prev_line = {}); 220 221 Result<shared_buffer_ref, std::string> read_range(file_range fr); 222 223 file_range get_available(); 224 clear()225 void clear() 226 { 227 this->lb_buffer_size = 0; 228 }; 229 230 /** Release any resources held by this object. */ reset()231 void reset() 232 { 233 this->lb_fd.reset(); 234 235 this->lb_file_offset = 0; 236 this->lb_file_size = (ssize_t)-1; 237 this->lb_buffer_size = 0; 238 this->lb_last_line_offset = -1; 239 }; 240 241 /** Check the invariants for this object. */ invariant()242 bool invariant() 243 { 244 require(this->lb_buffer != nullptr); 245 require(this->lb_buffer_size <= this->lb_buffer_max); 246 247 return true; 248 }; 249 250 private: 251 252 /** 253 * @param off The file offset to check for in the buffer. 254 * @return True if the given offset is cached in the buffer. 255 */ in_range(file_off_t off) const256 bool in_range(file_off_t off) const 257 { 258 return this->lb_file_offset <= off && 259 off < (this->lb_file_offset + this->lb_buffer_size); 260 }; 261 262 void resize_buffer(size_t new_max); 263 264 /** 265 * Ensure there is enough room in the buffer to cache a range of data from 266 * the file. First, this method will check to see if there is enough room 267 * from where 'start' begins in the buffer to the maximum buffer size. If 268 * this is not enough, the currently cached data at 'start' will be moved 269 * to the beginning of the buffer, overwriting any cached data earlier in 270 * the file. Finally, if this is still not enough, the buffer will be 271 * reallocated to make more room. 272 * 273 * @param start The file offset of the start of the line. 274 * @param max_length The amount of data to be cached in the buffer. 275 */ 276 void ensure_available(file_off_t start, ssize_t max_length); 277 278 /** 279 * Fill the buffer with the given range of data from the file. 280 * 281 * @param start The file offset where data should start to be read from the 282 * file. 283 * @param max_length The maximum amount of data to read from the file. 284 * @return True if any data was read from the file. 285 */ 286 bool fill_range(file_off_t start, ssize_t max_length); 287 288 /** 289 * After a successful fill, the cached data can be retrieved with this 290 * method. 291 * 292 * @param start The file offset to retrieve cached data for. 293 * @param avail_out On return, the amount of data currently cached at the 294 * given offset. 295 * @return A pointer to the start of the cached data in the internal 296 * buffer. 297 */ get_range(file_off_t start,file_ssize_t & avail_out) const298 char *get_range(file_off_t start, file_ssize_t &avail_out) const 299 { 300 auto buffer_offset = start - this->lb_file_offset; 301 char *retval; 302 303 require(buffer_offset >= 0); 304 require(this->lb_buffer_size >= buffer_offset); 305 306 retval = &this->lb_buffer[buffer_offset]; 307 avail_out = this->lb_buffer_size - buffer_offset; 308 309 return retval; 310 }; 311 312 shared_buffer lb_share_manager; 313 314 auto_fd lb_fd; /*< The file to read data from. */ 315 gz_indexed lb_gz_file; /*< File reader for gzipped files. */ 316 bool lb_bz_file; /*< Flag set for bzip2 compressed files. */ 317 file_off_t lb_compressed_offset; /*< The offset into the compressed file. */ 318 319 auto_mem<char> lb_buffer; /*< The internal buffer where data is cached */ 320 321 file_ssize_t lb_file_size; /*< 322 * The size of the file. When lb_fd refers to 323 * a pipe, this is set to the amount of data 324 * read from the pipe when EOF is reached. 325 */ 326 file_off_t lb_file_offset; /*< 327 * Data cached in the buffer comes from this 328 * offset in the file. 329 */ 330 time_t lb_file_time; 331 ssize_t lb_buffer_size; /*< The amount of cached data in the buffer. */ 332 ssize_t lb_buffer_max; /*< The amount of allocated memory for the 333 * buffer. */ 334 bool lb_seekable; /*< Flag set for seekable file descriptors. */ 335 file_off_t lb_last_line_offset; /*< */ 336 }; 337 #endif 338