1 use core::ops::Range;
2 use std::boxed::Box;
3 use std::cell::RefCell;
4 use std::collections::hash_map::Entry;
5 use std::collections::HashMap;
6 use std::convert::TryInto;
7 use std::io::{Read, Seek, SeekFrom};
8 use std::mem;
9 use std::vec::Vec;
10 
11 use crate::read::ReadRef;
12 
13 /// An implementation of `ReadRef` for data in a stream that implements
14 /// `Read + Seek`.
15 ///
16 /// Contains a cache of read-only blocks of data, allowing references to
17 /// them to be returned. Entries in the cache are never removed.
18 /// Entries are keyed on the offset and size of the read.
19 /// Currently overlapping reads are considered separate reads.
20 #[derive(Debug)]
21 pub struct ReadCache<R: Read + Seek> {
22     cache: RefCell<ReadCacheInternal<R>>,
23 }
24 
25 #[derive(Debug)]
26 struct ReadCacheInternal<R: Read + Seek> {
27     read: R,
28     bufs: HashMap<(u64, u64), Box<[u8]>>,
29     strings: HashMap<(u64, u8), Box<[u8]>>,
30 }
31 
32 impl<R: Read + Seek> ReadCache<R> {
33     /// Create an empty `ReadCache` for the given stream.
new(read: R) -> Self34     pub fn new(read: R) -> Self {
35         ReadCache {
36             cache: RefCell::new(ReadCacheInternal {
37                 read,
38                 bufs: HashMap::new(),
39                 strings: HashMap::new(),
40             }),
41         }
42     }
43 
44     /// Return an implementation of `ReadRef` that restricts reads
45     /// to the given range of the stream.
range(&self, offset: u64, size: u64) -> ReadCacheRange<'_, R>46     pub fn range(&self, offset: u64, size: u64) -> ReadCacheRange<'_, R> {
47         ReadCacheRange {
48             r: self,
49             offset,
50             size,
51         }
52     }
53 
54     /// Free buffers used by the cache.
clear(&mut self)55     pub fn clear(&mut self) {
56         self.cache.borrow_mut().bufs.clear();
57     }
58 
59     /// Unwrap this `ReadCache<R>`, returning the underlying reader.
into_inner(self) -> R60     pub fn into_inner(self) -> R {
61         self.cache.into_inner().read
62     }
63 }
64 
65 impl<'a, R: Read + Seek> ReadRef<'a> for &'a ReadCache<R> {
len(self) -> Result<u64, ()>66     fn len(self) -> Result<u64, ()> {
67         let cache = &mut *self.cache.borrow_mut();
68         cache.read.seek(SeekFrom::End(0)).map_err(|_| ())
69     }
70 
read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()>71     fn read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()> {
72         if size == 0 {
73             return Ok(&[]);
74         }
75         let cache = &mut *self.cache.borrow_mut();
76         let buf = match cache.bufs.entry((offset, size)) {
77             Entry::Occupied(entry) => entry.into_mut(),
78             Entry::Vacant(entry) => {
79                 let size = size.try_into().map_err(|_| ())?;
80                 cache
81                     .read
82                     .seek(SeekFrom::Start(offset as u64))
83                     .map_err(|_| ())?;
84                 let mut bytes = vec![0; size].into_boxed_slice();
85                 cache.read.read_exact(&mut bytes).map_err(|_| ())?;
86                 entry.insert(bytes)
87             }
88         };
89         // Extend the lifetime to that of self.
90         // This is OK because we never mutate or remove entries.
91         Ok(unsafe { mem::transmute::<&[u8], &[u8]>(buf) })
92     }
93 
read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()>94     fn read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()> {
95         let cache = &mut *self.cache.borrow_mut();
96         let buf = match cache.strings.entry((range.start, delimiter)) {
97             Entry::Occupied(entry) => entry.into_mut(),
98             Entry::Vacant(entry) => {
99                 cache
100                     .read
101                     .seek(SeekFrom::Start(range.start))
102                     .map_err(|_| ())?;
103 
104                 let max_check: usize = (range.end - range.start).try_into().map_err(|_| ())?;
105                 // Strings should be relatively small.
106                 // TODO: make this configurable?
107                 let max_check = max_check.min(4096);
108 
109                 let mut bytes = Vec::new();
110                 let mut checked = 0;
111                 loop {
112                     bytes.resize((checked + 256).min(max_check), 0);
113                     let read = cache.read.read(&mut bytes[checked..]).map_err(|_| ())?;
114                     if read == 0 {
115                         return Err(());
116                     }
117                     if let Some(len) = memchr::memchr(delimiter, &bytes[checked..][..read]) {
118                         bytes.truncate(checked + len);
119                         break entry.insert(bytes.into_boxed_slice());
120                     }
121                     checked += read;
122                     if checked >= max_check {
123                         return Err(());
124                     }
125                 }
126             }
127         };
128         // Extend the lifetime to that of self.
129         // This is OK because we never mutate or remove entries.
130         Ok(unsafe { mem::transmute::<&[u8], &[u8]>(buf) })
131     }
132 }
133 
134 /// An implementation of `ReadRef` for a range of data in a stream that
135 /// implements `Read + Seek`.
136 ///
137 /// Shares an underlying `ReadCache` with a lifetime of `'a`.
138 #[derive(Debug)]
139 pub struct ReadCacheRange<'a, R: Read + Seek> {
140     r: &'a ReadCache<R>,
141     offset: u64,
142     size: u64,
143 }
144 
145 impl<'a, R: Read + Seek> Clone for ReadCacheRange<'a, R> {
clone(&self) -> Self146     fn clone(&self) -> Self {
147         Self {
148             r: self.r,
149             offset: self.offset,
150             size: self.size,
151         }
152     }
153 }
154 
155 impl<'a, R: Read + Seek> Copy for ReadCacheRange<'a, R> {}
156 
157 impl<'a, R: Read + Seek> ReadRef<'a> for ReadCacheRange<'a, R> {
len(self) -> Result<u64, ()>158     fn len(self) -> Result<u64, ()> {
159         Ok(self.size)
160     }
161 
read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()>162     fn read_bytes_at(self, offset: u64, size: u64) -> Result<&'a [u8], ()> {
163         if size == 0 {
164             return Ok(&[]);
165         }
166         let end = offset.checked_add(size).ok_or(())?;
167         if end > self.size {
168             return Err(());
169         }
170         let r_offset = self.offset.checked_add(offset).ok_or(())?;
171         self.r.read_bytes_at(r_offset, size)
172     }
173 
read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()>174     fn read_bytes_at_until(self, range: Range<u64>, delimiter: u8) -> Result<&'a [u8], ()> {
175         let r_start = self.offset.checked_add(range.start).ok_or(())?;
176         let r_end = self.offset.checked_add(range.end).ok_or(())?;
177         let bytes = self.r.read_bytes_at_until(r_start..r_end, delimiter)?;
178         let size = bytes.len().try_into().map_err(|_| ())?;
179         let end = range.start.checked_add(size).ok_or(())?;
180         if end > self.size {
181             return Err(());
182         }
183         Ok(bytes)
184     }
185 }
186