1 use core::ops::Range; 2 use std::boxed::Box; 3 use std::cell::RefCell; 4 use std::collections::hash_map::Entry; 5 use std::collections::HashMap; 6 use std::convert::TryInto; 7 use std::io::{Read, Seek, SeekFrom}; 8 use std::mem; 9 use std::vec::Vec; 10 11 use crate::read::ReadRef; 12 13 /// An implementation of `ReadRef` for data in a stream that implements 14 /// `Read + Seek`. 15 /// 16 /// Contains a cache of read-only blocks of data, allowing references to 17 /// them to be returned. Entries in the cache are never removed. 18 /// Entries are keyed on the offset and size of the read. 19 /// Currently overlapping reads are considered separate reads. 20 #[derive(Debug)] 21 pub struct ReadCache<R: Read + Seek> { 22 cache: RefCell<ReadCacheInternal<R>>, 23 } 24 25 #[derive(Debug)] 26 struct ReadCacheInternal<R: Read + Seek> { 27 read: R, 28 bufs: HashMap<(u64, u64), Box<[u8]>>, 29 strings: HashMap<(u64, u8), Box<[u8]>>, 30 } 31 32 impl<R: Read + Seek> ReadCache<R> { 33 /// Create an empty `ReadCache` for the given stream. new(read: R) -> Self34 pub fn new(read: R) -> Self { 35 ReadCache { 36 cache: RefCell::new(ReadCacheInternal { 37 read, 38 bufs: HashMap::new(), 39 strings: HashMap::new(), 40 }), 41 } 42 } 43 44 /// Return an implementation of `ReadRef` that restricts reads 45 /// to the given range of the stream. range(&self, offset: u64, size: u64) -> ReadCacheRange<'_, R>46 pub fn range(&self, offset: u64, size: u64) -> ReadCacheRange<'_, R> { 47 ReadCacheRange { 48 r: self, 49 offset, 50 size, 51 } 52 } 53 54 /// Free buffers used by the cache. clear(&mut self)55 pub fn clear(&mut self) { 56 self.cache.borrow_mut().bufs.clear(); 57 } 58 59 /// Unwrap this `ReadCache<R>`, returning the underlying reader. into_inner(self) -> R60 pub fn into_inner(self) -> R { 61 self.cache.into_inner().read 62 } 63 } 64 65 impl<'a, R: Read + Seek> ReadRef<'a> for &'a ReadCache<R> { len(self) -> Result<u64, ()>66 fn len(self) -> Result<u64, ()> { 67 let cache = &mut *self.cache.borrow_mut(); 68 cache.read.seek(SeekFrom::End(0)).map_err(|_| ()) 69 } 70 read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()>71 fn read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()> { 72 if size == 0 { 73 return Ok(&[]); 74 } 75 let cache = &mut *self.cache.borrow_mut(); 76 let buf = match cache.bufs.entry((offset, size)) { 77 Entry::Occupied(entry) => entry.into_mut(), 78 Entry::Vacant(entry) => { 79 let size = size.try_into().map_err(|_| ())?; 80 cache 81 .read 82 .seek(SeekFrom::Start(offset as u64)) 83 .map_err(|_| ())?; 84 let mut bytes = vec![0; size].into_boxed_slice(); 85 cache.read.read_exact(&mut bytes).map_err(|_| ())?; 86 entry.insert(bytes) 87 } 88 }; 89 // Extend the lifetime to that of self. 90 // This is OK because we never mutate or remove entries. 91 Ok(unsafe { mem::transmute::<&[u8], &[u8]>(buf) }) 92 } 93 read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()>94 fn read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()> { 95 let cache = &mut *self.cache.borrow_mut(); 96 let buf = match cache.strings.entry((range.start, delimiter)) { 97 Entry::Occupied(entry) => entry.into_mut(), 98 Entry::Vacant(entry) => { 99 cache 100 .read 101 .seek(SeekFrom::Start(range.start)) 102 .map_err(|_| ())?; 103 104 let max_check: usize = (range.end - range.start).try_into().map_err(|_| ())?; 105 // Strings should be relatively small. 106 // TODO: make this configurable? 107 let max_check = max_check.min(4096); 108 109 let mut bytes = Vec::new(); 110 let mut checked = 0; 111 loop { 112 bytes.resize((checked + 256).min(max_check), 0); 113 let read = cache.read.read(&mut bytes[checked..]).map_err(|_| ())?; 114 if read == 0 { 115 return Err(()); 116 } 117 if let Some(len) = memchr::memchr(delimiter, &bytes[checked..][..read]) { 118 bytes.truncate(checked + len); 119 break entry.insert(bytes.into_boxed_slice()); 120 } 121 checked += read; 122 if checked >= max_check { 123 return Err(()); 124 } 125 } 126 } 127 }; 128 // Extend the lifetime to that of self. 129 // This is OK because we never mutate or remove entries. 130 Ok(unsafe { mem::transmute::<&[u8], &[u8]>(buf) }) 131 } 132 } 133 134 /// An implementation of `ReadRef` for a range of data in a stream that 135 /// implements `Read + Seek`. 136 /// 137 /// Shares an underlying `ReadCache` with a lifetime of `'a`. 138 #[derive(Debug)] 139 pub struct ReadCacheRange<'a, R: Read + Seek> { 140 r: &'a ReadCache<R>, 141 offset: u64, 142 size: u64, 143 } 144 145 impl<'a, R: Read + Seek> Clone for ReadCacheRange<'a, R> { clone(&self) -> Self146 fn clone(&self) -> Self { 147 Self { 148 r: self.r, 149 offset: self.offset, 150 size: self.size, 151 } 152 } 153 } 154 155 impl<'a, R: Read + Seek> Copy for ReadCacheRange<'a, R> {} 156 157 impl<'a, R: Read + Seek> ReadRef<'a> for ReadCacheRange<'a, R> { len(self) -> Result<u64, ()>158 fn len(self) -> Result<u64, ()> { 159 Ok(self.size) 160 } 161 read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()>162 fn read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()> { 163 if size == 0 { 164 return Ok(&[]); 165 } 166 let end = offset.checked_add(size).ok_or(())?; 167 if end > self.size { 168 return Err(()); 169 } 170 let r_offset = self.offset.checked_add(offset).ok_or(())?; 171 self.r.read_bytes_at(r_offset, size) 172 } 173 read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()>174 fn read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()> { 175 let r_start = self.offset.checked_add(range.start).ok_or(())?; 176 let r_end = self.offset.checked_add(range.end).ok_or(())?; 177 let bytes = self.r.read_bytes_at_until(r_start..r_end, delimiter)?; 178 let size = bytes.len().try_into().map_err(|_| ())?; 179 let end = range.start.checked_add(size).ok_or(())?; 180 if end > self.size { 181 return Err(()); 182 } 183 Ok(bytes) 184 } 185 } 186