1 //! Reader-based compression/decompression streams
2 
3 use std::io::prelude::*;
4 use std::io::{self, BufReader};
5 
6 #[cfg(feature = "tokio")]
7 use futures::Poll;
8 #[cfg(feature = "tokio")]
9 use tokio_io::{AsyncRead, AsyncWrite};
10 
11 use bufread;
12 use stream::Stream;
13 
14 /// A compression stream which wraps an uncompressed stream of data. Compressed
15 /// data will be read from the stream.
16 pub struct XzEncoder<R: Read> {
17     inner: bufread::XzEncoder<BufReader<R>>,
18 }
19 
20 /// A decompression stream which wraps a compressed stream of data. Decompressed
21 /// data will be read from the stream.
22 pub struct XzDecoder<R: Read> {
23     inner: bufread::XzDecoder<BufReader<R>>,
24 }
25 
26 impl<R: Read> XzEncoder<R> {
27     /// Create a new compression stream which will compress at the given level
28     /// to read compress output to the give output stream.
29     ///
30     /// The `level` argument here is typically 0-9 with 6 being a good default.
new(r: R, level: u32) -> XzEncoder<R>31     pub fn new(r: R, level: u32) -> XzEncoder<R> {
32         XzEncoder {
33             inner: bufread::XzEncoder::new(BufReader::new(r), level),
34         }
35     }
36 
37     /// Creates a new encoder with a custom `Stream`.
38     ///
39     /// The `Stream` can be pre-configured for multithreaded encoding, different
40     /// compression options/tuning, etc.
new_stream(r: R, stream: Stream) -> XzEncoder<R>41     pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> {
42         XzEncoder {
43             inner: bufread::XzEncoder::new_stream(BufReader::new(r), stream),
44         }
45     }
46 
47     /// Acquires a reference to the underlying stream
get_ref(&self) -> &R48     pub fn get_ref(&self) -> &R {
49         self.inner.get_ref().get_ref()
50     }
51 
52     /// Acquires a mutable reference to the underlying stream
53     ///
54     /// Note that mutation of the stream may result in surprising results if
55     /// this encoder is continued to be used.
get_mut(&mut self) -> &mut R56     pub fn get_mut(&mut self) -> &mut R {
57         self.inner.get_mut().get_mut()
58     }
59 
60     /// Unwrap the underlying writer, finishing the compression stream.
into_inner(self) -> R61     pub fn into_inner(self) -> R {
62         self.inner.into_inner().into_inner()
63     }
64 
65     /// Returns the number of bytes produced by the compressor
66     /// (e.g. the number of bytes read from this stream)
67     ///
68     /// Note that, due to buffering, this only bears any relation to
69     /// total_in() when the compressor chooses to flush its data
70     /// (unfortunately, this won't happen this won't happen in general
71     /// at the end of the stream, because the compressor doesn't know
72     /// if there's more data to come).  At that point,
73     /// `total_out() / total_in()` would be the compression ratio.
total_out(&self) -> u6474     pub fn total_out(&self) -> u64 {
75         self.inner.total_out()
76     }
77 
78     /// Returns the number of bytes consumed by the compressor
79     /// (e.g. the number of bytes read from the underlying stream)
total_in(&self) -> u6480     pub fn total_in(&self) -> u64 {
81         self.inner.total_in()
82     }
83 }
84 
85 impl<R: Read> Read for XzEncoder<R> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>86     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
87         self.inner.read(buf)
88     }
89 }
90 
91 #[cfg(feature = "tokio")]
92 impl<R: AsyncRead> AsyncRead for XzEncoder<R> {
93 }
94 
95 impl<W: Write + Read> Write for XzEncoder<W> {
write(&mut self, buf: &[u8]) -> io::Result<usize>96     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
97         self.get_mut().write(buf)
98     }
99 
flush(&mut self) -> io::Result<()>100     fn flush(&mut self) -> io::Result<()> {
101         self.get_mut().flush()
102     }
103 }
104 
105 #[cfg(feature = "tokio")]
106 impl<R: AsyncWrite + Read> AsyncWrite for XzEncoder<R> {
shutdown(&mut self) -> Poll<(), io::Error>107     fn shutdown(&mut self) -> Poll<(), io::Error> {
108         self.get_mut().shutdown()
109     }
110 }
111 
112 impl<R: Read> XzDecoder<R> {
113     /// Create a new decompression stream, which will read compressed
114     /// data from the given input stream, and decompress one xz stream.
115     /// It may also consume input data that follows the xz stream.
116     /// Use [`xz::bufread::XzDecoder`] instead to process a mix of xz and non-xz data.
new(r: R) -> XzDecoder<R>117     pub fn new(r: R) -> XzDecoder<R> {
118         XzDecoder {
119             inner: bufread::XzDecoder::new(BufReader::new(r)),
120         }
121     }
122 
123     /// Create a new decompression stream, which will read compressed
124     /// data from the given input and decompress all the xz stream it contains.
new_multi_decoder(r: R) -> XzDecoder<R>125     pub fn new_multi_decoder(r: R) -> XzDecoder<R> {
126         XzDecoder {
127             inner: bufread::XzDecoder::new_multi_decoder(BufReader::new(r)),
128         }
129     }
130 
131     /// Creates a new decoder with a custom `Stream`.
132     ///
133     /// The `Stream` can be pre-configured for various checks, different
134     /// decompression options/tuning, etc.
new_stream(r: R, stream: Stream) -> XzDecoder<R>135     pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> {
136         XzDecoder {
137             inner: bufread::XzDecoder::new_stream(BufReader::new(r), stream),
138         }
139     }
140 
141     /// Acquires a reference to the underlying stream
get_ref(&self) -> &R142     pub fn get_ref(&self) -> &R {
143         self.inner.get_ref().get_ref()
144     }
145 
146     /// Acquires a mutable reference to the underlying stream
147     ///
148     /// Note that mutation of the stream may result in surprising results if
149     /// this encoder is continued to be used.
get_mut(&mut self) -> &mut R150     pub fn get_mut(&mut self) -> &mut R {
151         self.inner.get_mut().get_mut()
152     }
153 
154     /// Unwrap the underlying writer, finishing the compression stream.
into_inner(self) -> R155     pub fn into_inner(self) -> R {
156         self.inner.into_inner().into_inner()
157     }
158 
159     /// Returns the number of bytes produced by the decompressor
160     /// (e.g. the number of bytes read from this stream)
161     ///
162     /// Note that, due to buffering, this only bears any relation to
163     /// total_in() when the decompressor reaches a sync point
164     /// (e.g. where the original compressed stream was flushed).
165     /// At that point, `total_in() / total_out()` is the compression ratio.
total_out(&self) -> u64166     pub fn total_out(&self) -> u64 {
167         self.inner.total_out()
168     }
169 
170     /// Returns the number of bytes consumed by the decompressor
171     /// (e.g. the number of bytes read from the underlying stream)
total_in(&self) -> u64172     pub fn total_in(&self) -> u64 {
173         self.inner.total_in()
174     }
175 }
176 
177 impl<R: Read> Read for XzDecoder<R> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>178     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
179         self.inner.read(buf)
180     }
181 }
182 
183 #[cfg(feature = "tokio")]
184 impl<R: AsyncRead + Read> AsyncRead for XzDecoder<R> {
185 }
186 
187 impl<W: Write + Read> Write for XzDecoder<W> {
write(&mut self, buf: &[u8]) -> io::Result<usize>188     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
189         self.get_mut().write(buf)
190     }
191 
flush(&mut self) -> io::Result<()>192     fn flush(&mut self) -> io::Result<()> {
193         self.get_mut().flush()
194     }
195 }
196 
197 #[cfg(feature = "tokio")]
198 impl<R: AsyncWrite + Read> AsyncWrite for XzDecoder<R> {
shutdown(&mut self) -> Poll<(), io::Error>199     fn shutdown(&mut self) -> Poll<(), io::Error> {
200         self.get_mut().shutdown()
201     }
202 }
203 
204 #[cfg(test)]
205 mod tests {
206     use std::io::prelude::*;
207     use read::{XzEncoder, XzDecoder};
208     use rand::{thread_rng, Rng};
209 
210     #[test]
smoke()211     fn smoke() {
212         let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8];
213         let mut c = XzEncoder::new(m, 6);
214         let mut data = vec![];
215         c.read_to_end(&mut data).unwrap();
216         let mut d = XzDecoder::new(&data[..]);
217         let mut data2 = Vec::new();
218         d.read_to_end(&mut data2).unwrap();
219         assert_eq!(data2, m);
220     }
221 
222     #[test]
smoke2()223     fn smoke2() {
224         let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8];
225         let c = XzEncoder::new(m, 6);
226         let mut d = XzDecoder::new(c);
227         let mut data = vec![];
228         d.read_to_end(&mut data).unwrap();
229         assert_eq!(data, [1, 2, 3, 4, 5, 6, 7, 8]);
230     }
231 
232     #[test]
smoke3()233     fn smoke3() {
234         let m = vec![3u8; 128 * 1024 + 1];
235         let c = XzEncoder::new(&m[..], 6);
236         let mut d = XzDecoder::new(c);
237         let mut data = vec![];
238         d.read_to_end(&mut data).unwrap();
239         assert!(data == &m[..]);
240     }
241 
242     #[test]
self_terminating()243     fn self_terminating() {
244         let m = vec![3u8; 128 * 1024 + 1];
245         let mut c = XzEncoder::new(&m[..], 6);
246 
247         let mut result = Vec::new();
248         c.read_to_end(&mut result).unwrap();
249 
250         let v = thread_rng().gen_iter::<u8>().take(1024).collect::<Vec<_>>();
251         for _ in 0..200 {
252             result.extend(v.iter().map(|x| *x));
253         }
254 
255         let mut d = XzDecoder::new(&result[..]);
256         let mut data = Vec::with_capacity(m.len());
257         unsafe { data.set_len(m.len()); }
258         assert!(d.read(&mut data).unwrap() == m.len());
259         assert!(data == &m[..]);
260     }
261 
262     #[test]
zero_length_read_at_eof()263     fn zero_length_read_at_eof() {
264         let m = Vec::new();
265         let mut c = XzEncoder::new(&m[..], 6);
266 
267         let mut result = Vec::new();
268         c.read_to_end(&mut result).unwrap();
269 
270         let mut d = XzDecoder::new(&result[..]);
271         let mut data = Vec::new();
272         assert!(d.read(&mut data).unwrap() == 0);
273     }
274 
275     #[test]
zero_length_read_with_data()276     fn zero_length_read_with_data() {
277         let m = vec![3u8; 128 * 1024 + 1];
278         let mut c = XzEncoder::new(&m[..], 6);
279 
280         let mut result = Vec::new();
281         c.read_to_end(&mut result).unwrap();
282 
283         let mut d = XzDecoder::new(&result[..]);
284         let mut data = Vec::new();
285         assert!(d.read(&mut data).unwrap() == 0);
286     }
287 
288     #[test]
qc()289     fn qc() {
290         ::quickcheck::quickcheck(test as fn(_) -> _);
291 
292         fn test(v: Vec<u8>) -> bool {
293             let r = XzEncoder::new(&v[..], 6);
294             let mut r = XzDecoder::new(r);
295             let mut v2 = Vec::new();
296             r.read_to_end(&mut v2).unwrap();
297             v == v2
298         }
299     }
300 
301     #[test]
two_streams()302     fn two_streams() {
303         let mut input_stream1: Vec<u8> = Vec::new();
304         let mut input_stream2: Vec<u8> = Vec::new();
305         let mut all_input : Vec<u8> = Vec::new();
306 
307         // Generate input data.
308         const STREAM1_SIZE: usize = 1024;
309         for num in 0..STREAM1_SIZE {
310             input_stream1.push(num as u8)
311         }
312         const STREAM2_SIZE: usize = 532;
313         for num in 0..STREAM2_SIZE {
314             input_stream2.push((num + 32) as u8)
315         }
316         all_input.extend(&input_stream1);
317         all_input.extend(&input_stream2);
318 
319         // Make a vector with compressed data
320         let mut decoder_input = Vec::new();
321         {
322             let mut encoder = XzEncoder::new(&input_stream1[..], 6);
323             encoder.read_to_end(&mut decoder_input).unwrap();
324         }
325         {
326             let mut encoder = XzEncoder::new(&input_stream2[..], 6);
327             encoder.read_to_end(&mut decoder_input).unwrap();
328         }
329 
330         // Decoder must be able to read the 2 concatenated xz streams and get the same data as input.
331         let mut decoder_reader = &decoder_input[..];
332         {
333             // using `XzDecoder::new` here would fail because only 1 xz stream would be processed.
334             let mut decoder = XzDecoder::new_multi_decoder(&mut decoder_reader);
335             let mut decompressed_data = vec![0u8; all_input.len()];
336 
337             assert_eq!(decoder.read(&mut decompressed_data).unwrap(), all_input.len());
338             assert_eq!(decompressed_data, &all_input[..]);
339         }
340     }
341 }
342