1 //! Reader-based compression/decompression streams
2 
3 use std::io::prelude::*;
4 use std::io::{self, BufReader};
5 
6 #[cfg(feature = "tokio")]
7 use futures::Poll;
8 #[cfg(feature = "tokio")]
9 use tokio_io::{AsyncRead, AsyncWrite};
10 
11 use bufread;
12 use Compression;
13 
14 /// A compression stream which wraps an uncompressed stream of data. Compressed
15 /// data will be read from the stream.
16 pub struct BzEncoder<R> {
17     inner: bufread::BzEncoder<BufReader<R>>,
18 }
19 
20 /// A decompression stream which wraps a compressed stream of data. Decompressed
21 /// data will be read from the stream.
22 pub struct BzDecoder<R> {
23     inner: bufread::BzDecoder<BufReader<R>>,
24 }
25 
26 impl<R: Read> BzEncoder<R> {
27     /// Create a new compression stream which will compress at the given level
28     /// to read compress output to the give output stream.
new(r: R, level: Compression) -> BzEncoder<R>29     pub fn new(r: R, level: Compression) -> BzEncoder<R> {
30         BzEncoder {
31             inner: bufread::BzEncoder::new(BufReader::new(r), level),
32         }
33     }
34 
35     /// Acquires a reference to the underlying stream
get_ref(&self) -> &R36     pub fn get_ref(&self) -> &R {
37         self.inner.get_ref().get_ref()
38     }
39 
40     /// Acquires a mutable reference to the underlying stream
41     ///
42     /// Note that mutation of the stream may result in surprising results if
43     /// this encoder is continued to be used.
get_mut(&mut self) -> &mut R44     pub fn get_mut(&mut self) -> &mut R {
45         self.inner.get_mut().get_mut()
46     }
47 
48     /// Unwrap the underlying writer, finishing the compression stream.
into_inner(self) -> R49     pub fn into_inner(self) -> R {
50         self.inner.into_inner().into_inner()
51     }
52 
53     /// Returns the number of bytes produced by the compressor
54     /// (e.g. the number of bytes read from this stream)
55     ///
56     /// Note that, due to buffering, this only bears any relation to
57     /// total_in() when the compressor chooses to flush its data
58     /// (unfortunately, this won't happen in general
59     /// at the end of the stream, because the compressor doesn't know
60     /// if there's more data to come).  At that point,
61     /// `total_out() / total_in()` would be the compression ratio.
total_out(&self) -> u6462     pub fn total_out(&self) -> u64 {
63         self.inner.total_out()
64     }
65 
66     /// Returns the number of bytes consumed by the compressor
67     /// (e.g. the number of bytes read from the underlying stream)
total_in(&self) -> u6468     pub fn total_in(&self) -> u64 {
69         self.inner.total_in()
70     }
71 }
72 
73 impl<R: Read> Read for BzEncoder<R> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>74     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
75         self.inner.read(buf)
76     }
77 }
78 
79 #[cfg(feature = "tokio")]
80 impl<R: AsyncRead> AsyncRead for BzEncoder<R> {}
81 
82 impl<W: Write + Read> Write for BzEncoder<W> {
write(&mut self, buf: &[u8]) -> io::Result<usize>83     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
84         self.get_mut().write(buf)
85     }
86 
flush(&mut self) -> io::Result<()>87     fn flush(&mut self) -> io::Result<()> {
88         self.get_mut().flush()
89     }
90 }
91 
92 #[cfg(feature = "tokio")]
93 impl<R: AsyncWrite + Read> AsyncWrite for BzEncoder<R> {
shutdown(&mut self) -> Poll<(), io::Error>94     fn shutdown(&mut self) -> Poll<(), io::Error> {
95         self.get_mut().shutdown()
96     }
97 }
98 
99 impl<R: Read> BzDecoder<R> {
100     /// Create a new decompression stream, which will read compressed
101     /// data from the given input stream and decompress it.
new(r: R) -> BzDecoder<R>102     pub fn new(r: R) -> BzDecoder<R> {
103         BzDecoder {
104             inner: bufread::BzDecoder::new(BufReader::new(r)),
105         }
106     }
107 
108     /// Acquires a reference to the underlying stream
get_ref(&self) -> &R109     pub fn get_ref(&self) -> &R {
110         self.inner.get_ref().get_ref()
111     }
112 
113     /// Acquires a mutable reference to the underlying stream
114     ///
115     /// Note that mutation of the stream may result in surprising results if
116     /// this encoder is continued to be used.
get_mut(&mut self) -> &mut R117     pub fn get_mut(&mut self) -> &mut R {
118         self.inner.get_mut().get_mut()
119     }
120 
121     /// Unwrap the underlying writer, finishing the compression stream.
into_inner(self) -> R122     pub fn into_inner(self) -> R {
123         self.inner.into_inner().into_inner()
124     }
125 
126     /// Returns the number of bytes produced by the decompressor
127     /// (e.g. the number of bytes read from this stream)
128     ///
129     /// Note that, due to buffering, this only bears any relation to
130     /// total_in() when the decompressor reaches a sync point
131     /// (e.g. where the original compressed stream was flushed).
132     /// At that point, `total_in() / total_out()` is the compression ratio.
total_out(&self) -> u64133     pub fn total_out(&self) -> u64 {
134         self.inner.total_out()
135     }
136 
137     /// Returns the number of bytes consumed by the decompressor
138     /// (e.g. the number of bytes read from the underlying stream)
total_in(&self) -> u64139     pub fn total_in(&self) -> u64 {
140         self.inner.total_in()
141     }
142 }
143 
144 impl<R: Read> Read for BzDecoder<R> {
read(&mut self, into: &mut [u8]) -> io::Result<usize>145     fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
146         self.inner.read(into)
147     }
148 }
149 
150 #[cfg(feature = "tokio")]
151 impl<R: AsyncRead + Read> AsyncRead for BzDecoder<R> {}
152 
153 impl<W: Write + Read> Write for BzDecoder<W> {
write(&mut self, buf: &[u8]) -> io::Result<usize>154     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
155         self.get_mut().write(buf)
156     }
157 
flush(&mut self) -> io::Result<()>158     fn flush(&mut self) -> io::Result<()> {
159         self.get_mut().flush()
160     }
161 }
162 
163 #[cfg(feature = "tokio")]
164 impl<R: AsyncWrite + Read> AsyncWrite for BzDecoder<R> {
shutdown(&mut self) -> Poll<(), io::Error>165     fn shutdown(&mut self) -> Poll<(), io::Error> {
166         self.get_mut().shutdown()
167     }
168 }
169 
170 /// A bzip2 streaming decoder that decodes all members of a multistream
171 ///
172 /// Wikipedia, particularly, uses bzip2 multistream for their dumps.
173 pub struct MultiBzDecoder<R> {
174     inner: bufread::MultiBzDecoder<BufReader<R>>,
175 }
176 
177 impl<R: Read> MultiBzDecoder<R> {
178     /// Creates a new decoder from the given reader, immediately parsing the
179     /// (first) gzip header. If the gzip stream contains multiple members all will
180     /// be decoded.
new(r: R) -> MultiBzDecoder<R>181     pub fn new(r: R) -> MultiBzDecoder<R> {
182         MultiBzDecoder {
183             inner: bufread::MultiBzDecoder::new(BufReader::new(r)),
184         }
185     }
186 }
187 
188 impl<R> MultiBzDecoder<R> {
189     /// Acquires a reference to the underlying reader.
get_ref(&self) -> &R190     pub fn get_ref(&self) -> &R {
191         self.inner.get_ref().get_ref()
192     }
193 
194     /// Acquires a mutable reference to the underlying stream.
195     ///
196     /// Note that mutation of the stream may result in surprising results if
197     /// this encoder is continued to be used.
get_mut(&mut self) -> &mut R198     pub fn get_mut(&mut self) -> &mut R {
199         self.inner.get_mut().get_mut()
200     }
201 
202     /// Consumes this decoder, returning the underlying reader.
into_inner(self) -> R203     pub fn into_inner(self) -> R {
204         self.inner.into_inner().into_inner()
205     }
206 }
207 
208 impl<R: Read> Read for MultiBzDecoder<R> {
read(&mut self, into: &mut [u8]) -> io::Result<usize>209     fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
210         self.inner.read(into)
211     }
212 }
213 
214 #[cfg(feature = "tokio")]
215 impl<R: AsyncRead> AsyncRead for MultiBzDecoder<R> {}
216 
217 impl<R: Read + Write> Write for MultiBzDecoder<R> {
write(&mut self, buf: &[u8]) -> io::Result<usize>218     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
219         self.get_mut().write(buf)
220     }
221 
flush(&mut self) -> io::Result<()>222     fn flush(&mut self) -> io::Result<()> {
223         self.get_mut().flush()
224     }
225 }
226 
227 #[cfg(feature = "tokio")]
228 impl<R: AsyncWrite + AsyncRead> AsyncWrite for MultiBzDecoder<R> {
shutdown(&mut self) -> Poll<(), io::Error>229     fn shutdown(&mut self) -> Poll<(), io::Error> {
230         self.get_mut().shutdown()
231     }
232 }
233 
234 #[cfg(test)]
235 mod tests {
236     use partial_io::{GenInterrupted, PartialRead, PartialWithErrors};
237     use rand::distributions::Standard;
238     use rand::{thread_rng, Rng};
239     use read::{BzDecoder, BzEncoder, MultiBzDecoder};
240     use std::io::prelude::*;
241     use Compression;
242 
243     #[test]
smoke()244     fn smoke() {
245         let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8];
246         let mut c = BzEncoder::new(m, Compression::default());
247         let mut data = vec![];
248         c.read_to_end(&mut data).unwrap();
249         let mut d = BzDecoder::new(&data[..]);
250         let mut data2 = Vec::new();
251         d.read_to_end(&mut data2).unwrap();
252         assert_eq!(data2, m);
253     }
254 
255     #[test]
smoke2()256     fn smoke2() {
257         let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8];
258         let c = BzEncoder::new(m, Compression::default());
259         let mut d = BzDecoder::new(c);
260         let mut data = vec![];
261         d.read_to_end(&mut data).unwrap();
262         assert_eq!(data, [1, 2, 3, 4, 5, 6, 7, 8]);
263     }
264 
265     #[test]
smoke3()266     fn smoke3() {
267         let m = vec![3u8; 128 * 1024 + 1];
268         let c = BzEncoder::new(&m[..], Compression::default());
269         let mut d = BzDecoder::new(c);
270         let mut data = vec![];
271         d.read_to_end(&mut data).unwrap();
272         assert!(data == &m[..]);
273     }
274 
275     #[test]
self_terminating()276     fn self_terminating() {
277         let m = vec![3u8; 128 * 1024 + 1];
278         let mut c = BzEncoder::new(&m[..], Compression::default());
279 
280         let mut result = Vec::new();
281         c.read_to_end(&mut result).unwrap();
282 
283         let v = thread_rng()
284             .sample_iter(&Standard)
285             .take(1024)
286             .collect::<Vec<_>>();
287         for _ in 0..200 {
288             result.extend(v.iter().map(|x: &u8| *x));
289         }
290 
291         let mut d = BzDecoder::new(&result[..]);
292         let mut data = Vec::with_capacity(m.len());
293         unsafe {
294             data.set_len(m.len());
295         }
296         assert!(d.read(&mut data).unwrap() == m.len());
297         assert!(data == &m[..]);
298     }
299 
300     #[test]
zero_length_read_at_eof()301     fn zero_length_read_at_eof() {
302         let m = Vec::new();
303         let mut c = BzEncoder::new(&m[..], Compression::default());
304 
305         let mut result = Vec::new();
306         c.read_to_end(&mut result).unwrap();
307 
308         let mut d = BzDecoder::new(&result[..]);
309         let mut data = Vec::new();
310         assert!(d.read(&mut data).unwrap() == 0);
311     }
312 
313     #[test]
zero_length_read_with_data()314     fn zero_length_read_with_data() {
315         let m = vec![3u8; 128 * 1024 + 1];
316         let mut c = BzEncoder::new(&m[..], Compression::default());
317 
318         let mut result = Vec::new();
319         c.read_to_end(&mut result).unwrap();
320 
321         let mut d = BzDecoder::new(&result[..]);
322         let mut data = Vec::new();
323         assert!(d.read(&mut data).unwrap() == 0);
324     }
325 
326     #[test]
multistream_read_till_eof()327     fn multistream_read_till_eof() {
328         let m = vec![3u8; 128 * 1024 + 1];
329         let repeat = 3;
330         let mut result = Vec::new();
331 
332         for _i in 0..repeat {
333             let mut c = BzEncoder::new(&m[..], Compression::default());
334             c.read_to_end(&mut result).unwrap();
335         }
336 
337         let mut d = MultiBzDecoder::new(&result[..]);
338         let mut data = Vec::new();
339 
340         let a = d.read_to_end(&mut data).unwrap();
341         let b = m.len() * repeat;
342         assert!(a == b, "{} {}", a, b);
343     }
344 
345     #[test]
empty()346     fn empty() {
347         let r = BzEncoder::new(&[][..], Compression::default());
348         let mut r = BzDecoder::new(r);
349         let mut v2 = Vec::new();
350         r.read_to_end(&mut v2).unwrap();
351         assert!(v2.len() == 0);
352     }
353 
354     #[test]
qc()355     fn qc() {
356         ::quickcheck::quickcheck(test as fn(_) -> _);
357 
358         fn test(v: Vec<u8>) -> bool {
359             let r = BzEncoder::new(&v[..], Compression::default());
360             let mut r = BzDecoder::new(r);
361             let mut v2 = Vec::new();
362             r.read_to_end(&mut v2).unwrap();
363             v == v2
364         }
365     }
366 
367     #[test]
qc_partial()368     fn qc_partial() {
369         quickcheck6::quickcheck(test as fn(_, _, _) -> _);
370 
371         fn test(
372             v: Vec<u8>,
373             encode_ops: PartialWithErrors<GenInterrupted>,
374             decode_ops: PartialWithErrors<GenInterrupted>,
375         ) -> bool {
376             let r = BzEncoder::new(PartialRead::new(&v[..], encode_ops), Compression::default());
377             let mut r = BzDecoder::new(PartialRead::new(r, decode_ops));
378             let mut v2 = Vec::new();
379             r.read_to_end(&mut v2).unwrap();
380             v == v2
381         }
382     }
383 }
384