1 //! I/O streams for wrapping `BufRead` types as encoders/decoders
2 
3 use std::io::prelude::*;
4 use std::io;
5 use lzma_sys;
6 
7 #[cfg(feature = "tokio")]
8 use futures::Poll;
9 #[cfg(feature = "tokio")]
10 use tokio_io::{AsyncRead, AsyncWrite};
11 
12 use stream::{Stream, Check, Action, Status};
13 
14 /// An xz encoder, or compressor.
15 ///
16 /// This structure implements a `BufRead` interface and will read uncompressed
17 /// data from an underlying stream and emit a stream of compressed data.
18 pub struct XzEncoder<R> {
19     obj: R,
20     data: Stream,
21 }
22 
23 /// A xz decoder, or decompressor.
24 ///
25 /// This structure implements a `BufRead` interface and takes a stream of
26 /// compressed data as input, providing the decompressed data when read from.
27 pub struct XzDecoder<R> {
28     obj: R,
29     data: Stream,
30 }
31 
32 impl<R: BufRead> XzEncoder<R> {
33     /// Creates a new encoder which will read uncompressed data from the given
34     /// stream and emit the compressed stream.
35     ///
36     /// The `level` argument here is typically 0-9 with 6 being a good default.
new(r: R, level: u32) -> XzEncoder<R>37     pub fn new(r: R, level: u32) -> XzEncoder<R> {
38         let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
39         XzEncoder::new_stream(r, stream)
40     }
41 
42     /// Creates a new encoder with a custom `Stream`.
43     ///
44     /// The `Stream` can be pre-configured for multithreaded encoding, different
45     /// compression options/tuning, etc.
new_stream(r: R, stream: Stream) -> XzEncoder<R>46     pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> {
47         XzEncoder {
48             obj: r,
49             data: stream,
50         }
51     }
52 }
53 
54 impl<R> XzEncoder<R> {
55     /// Acquires a reference to the underlying stream
get_ref(&self) -> &R56     pub fn get_ref(&self) -> &R {
57         &self.obj
58     }
59 
60     /// Acquires a mutable reference to the underlying stream
61     ///
62     /// Note that mutation of the stream may result in surprising results if
63     /// this encoder is continued to be used.
get_mut(&mut self) -> &mut R64     pub fn get_mut(&mut self) -> &mut R {
65         &mut self.obj
66     }
67 
68     /// Consumes this encoder, returning the underlying reader.
into_inner(self) -> R69     pub fn into_inner(self) -> R {
70         self.obj
71     }
72 
73     /// Returns the number of bytes produced by the compressor
74     /// (e.g. the number of bytes read from this stream)
75     ///
76     /// Note that, due to buffering, this only bears any relation to
77     /// total_in() when the compressor chooses to flush its data
78     /// (unfortunately, this won't happen this won't happen in general
79     /// at the end of the stream, because the compressor doesn't know
80     /// if there's more data to come).  At that point,
81     /// `total_out() / total_in()` would be the compression ratio.
total_out(&self) -> u6482     pub fn total_out(&self) -> u64 {
83         self.data.total_out()
84     }
85 
86     /// Returns the number of bytes consumed by the compressor
87     /// (e.g. the number of bytes read from the underlying stream)
total_in(&self) -> u6488     pub fn total_in(&self) -> u64 {
89         self.data.total_in()
90     }
91 }
92 
93 impl<R: BufRead> Read for XzEncoder<R> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>94     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
95         loop {
96             let (read, consumed, eof, ret);
97             {
98                 let input = self.obj.fill_buf()?;
99                 eof = input.is_empty();
100                 let before_out = self.data.total_out();
101                 let before_in = self.data.total_in();
102                 let action = if eof {Action::Finish} else {Action::Run};
103                 ret = self.data.process(input, buf, action);
104                 read = (self.data.total_out() - before_out) as usize;
105                 consumed = (self.data.total_in() - before_in) as usize;
106             }
107             self.obj.consume(consumed);
108 
109             ret.unwrap();
110 
111             // If we haven't ready any data and we haven't hit EOF yet, then we
112             // need to keep asking for more data because if we return that 0
113             // bytes of data have been read then it will be interpreted as EOF.
114             if read == 0 && !eof && buf.len() > 0 {
115                 continue
116             }
117             return Ok(read)
118         }
119     }
120 }
121 
122 #[cfg(feature = "tokio")]
123 impl<R: AsyncRead + BufRead> AsyncRead for XzEncoder<R> {
124 }
125 
126 impl<W: Write> Write for XzEncoder<W> {
write(&mut self, buf: &[u8]) -> io::Result<usize>127     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
128         self.get_mut().write(buf)
129     }
130 
flush(&mut self) -> io::Result<()>131     fn flush(&mut self) -> io::Result<()> {
132         self.get_mut().flush()
133     }
134 }
135 
136 #[cfg(feature = "tokio")]
137 impl<R: AsyncWrite> AsyncWrite for XzEncoder<R> {
shutdown(&mut self) -> Poll<(), io::Error>138     fn shutdown(&mut self) -> Poll<(), io::Error> {
139         self.get_mut().shutdown()
140     }
141 }
142 
143 impl<R: BufRead> XzDecoder<R> {
144     /// Creates a new decoder which will decompress data read from the given
145     /// stream.
new(r: R) -> XzDecoder<R>146     pub fn new(r: R) -> XzDecoder<R> {
147         let stream = Stream::new_stream_decoder(u64::max_value(), 0).unwrap();
148         XzDecoder::new_stream(r, stream)
149     }
150 
151     /// Creates a new decoder which will decompress data read from the given
152     /// input. All the concatenated xz streams from input will be consumed.
new_multi_decoder(r: R) -> XzDecoder<R>153     pub fn new_multi_decoder(r: R) -> XzDecoder<R> {
154         let stream = Stream::new_auto_decoder(u64::max_value(), lzma_sys::LZMA_CONCATENATED).unwrap();
155         XzDecoder::new_stream(r, stream)
156     }
157 
158     /// Creates a new decoder with a custom `Stream`.
159     ///
160     /// The `Stream` can be pre-configured for various checks, different
161     /// decompression options/tuning, etc.
new_stream(r: R, stream: Stream) -> XzDecoder<R>162     pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> {
163         XzDecoder { obj: r, data: stream }
164     }
165 }
166 
167 impl<R> XzDecoder<R> {
168     /// Acquires a reference to the underlying stream
get_ref(&self) -> &R169     pub fn get_ref(&self) -> &R {
170         &self.obj
171     }
172 
173     /// Acquires a mutable reference to the underlying stream
174     ///
175     /// Note that mutation of the stream may result in surprising results if
176     /// this encoder is continued to be used.
get_mut(&mut self) -> &mut R177     pub fn get_mut(&mut self) -> &mut R {
178         &mut self.obj
179     }
180 
181     /// Consumes this decoder, returning the underlying reader.
into_inner(self) -> R182     pub fn into_inner(self) -> R {
183         self.obj
184     }
185 
186     /// Returns the number of bytes that the decompressor has consumed.
187     ///
188     /// Note that this will likely be smaller than what the decompressor
189     /// actually read from the underlying stream due to buffering.
total_in(&self) -> u64190     pub fn total_in(&self) -> u64 {
191         self.data.total_in()
192     }
193 
194     /// Returns the number of bytes that the decompressor has produced.
total_out(&self) -> u64195     pub fn total_out(&self) -> u64 {
196         self.data.total_out()
197     }
198 }
199 
200 impl<R: BufRead> Read for XzDecoder<R> {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>201     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
202         loop {
203             let (read, consumed, eof, ret);
204             {
205                 let input = self.obj.fill_buf()?;
206                 eof = input.is_empty();
207                 let before_out = self.data.total_out();
208                 let before_in = self.data.total_in();
209                 ret = self.data.process(input, buf, if eof { Action::Finish } else { Action::Run });
210                 read = (self.data.total_out() - before_out) as usize;
211                 consumed = (self.data.total_in() - before_in) as usize;
212             }
213             self.obj.consume(consumed);
214 
215             let status = ret?;
216             if read > 0 || eof || buf.len() == 0 {
217                 if read == 0 && status != Status::StreamEnd && buf.len() > 0 {
218                     return Err(io::Error::new(io::ErrorKind::Other,
219                                               "premature eof"))
220                 }
221                 return Ok(read)
222             }
223             if consumed == 0 {
224                 return Err(io::Error::new(io::ErrorKind::Other,
225                                           "corrupt xz stream"))
226             }
227         }
228     }
229 }
230 
231 #[cfg(feature = "tokio")]
232 impl<R: AsyncRead + BufRead> AsyncRead for XzDecoder<R> {
233 }
234 
235 impl<W: Write> Write for XzDecoder<W> {
write(&mut self, buf: &[u8]) -> io::Result<usize>236     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
237         self.get_mut().write(buf)
238     }
239 
flush(&mut self) -> io::Result<()>240     fn flush(&mut self) -> io::Result<()> {
241         self.get_mut().flush()
242     }
243 }
244 
245 #[cfg(feature = "tokio")]
246 impl<R: AsyncWrite> AsyncWrite for XzDecoder<R> {
shutdown(&mut self) -> Poll<(), io::Error>247     fn shutdown(&mut self) -> Poll<(), io::Error> {
248         self.get_mut().shutdown()
249     }
250 }
251 
252 #[cfg(test)]
253 mod tests {
254     use bufread::{XzEncoder, XzDecoder};
255     use std::io::Read;
256 
257     #[test]
compressed_and_trailing_data()258     fn compressed_and_trailing_data() {
259         // Make a vector with compressed data...
260         let mut to_compress : Vec<u8> = Vec::new();
261         const COMPRESSED_ORIG_SIZE: usize = 1024;
262         for num in 0..COMPRESSED_ORIG_SIZE {
263             to_compress.push(num as u8)
264         }
265         let mut encoder = XzEncoder::new(&to_compress[..], 6);
266 
267         let mut decoder_input = Vec::new();
268         encoder.read_to_end(&mut decoder_input).unwrap();
269 
270         // ...plus additional unrelated trailing data
271         const ADDITIONAL_SIZE : usize = 123;
272         let mut additional_data = Vec::new();
273         for num in 0..ADDITIONAL_SIZE {
274             additional_data.push(((25 + num) % 256) as u8)
275         }
276         decoder_input.extend(&additional_data);
277 
278         // Decoder must be able to read the compressed xz stream, and keep the trailing data.
279         let mut decoder_reader = &decoder_input[..];
280         {
281             let mut decoder = XzDecoder::new(&mut decoder_reader);
282             let mut decompressed_data = vec![0u8; to_compress.len()];
283 
284             assert_eq!(decoder.read(&mut decompressed_data).unwrap(), COMPRESSED_ORIG_SIZE);
285             assert_eq!(decompressed_data, &to_compress[..]);
286         }
287 
288         let mut remaining_data = Vec::new();
289         let nb_read = decoder_reader.read_to_end(&mut remaining_data).unwrap();
290         assert_eq!(nb_read, ADDITIONAL_SIZE);
291         assert_eq!(remaining_data, &additional_data[..]);
292     }
293 }
294