1 //! I/O streams for wrapping `BufRead` types as encoders/decoders 2 3 use std::io::prelude::*; 4 use std::io; 5 use lzma_sys; 6 7 #[cfg(feature = "tokio")] 8 use futures::Poll; 9 #[cfg(feature = "tokio")] 10 use tokio_io::{AsyncRead, AsyncWrite}; 11 12 use stream::{Stream, Check, Action, Status}; 13 14 /// An xz encoder, or compressor. 15 /// 16 /// This structure implements a `BufRead` interface and will read uncompressed 17 /// data from an underlying stream and emit a stream of compressed data. 18 pub struct XzEncoder<R> { 19 obj: R, 20 data: Stream, 21 } 22 23 /// A xz decoder, or decompressor. 24 /// 25 /// This structure implements a `BufRead` interface and takes a stream of 26 /// compressed data as input, providing the decompressed data when read from. 27 pub struct XzDecoder<R> { 28 obj: R, 29 data: Stream, 30 } 31 32 impl<R: BufRead> XzEncoder<R> { 33 /// Creates a new encoder which will read uncompressed data from the given 34 /// stream and emit the compressed stream. 35 /// 36 /// The `level` argument here is typically 0-9 with 6 being a good default. new(r: R, level: u32) -> XzEncoder<R>37 pub fn new(r: R, level: u32) -> XzEncoder<R> { 38 let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap(); 39 XzEncoder::new_stream(r, stream) 40 } 41 42 /// Creates a new encoder with a custom `Stream`. 43 /// 44 /// The `Stream` can be pre-configured for multithreaded encoding, different 45 /// compression options/tuning, etc. new_stream(r: R, stream: Stream) -> XzEncoder<R>46 pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> { 47 XzEncoder { 48 obj: r, 49 data: stream, 50 } 51 } 52 } 53 54 impl<R> XzEncoder<R> { 55 /// Acquires a reference to the underlying stream get_ref(&self) -> &R56 pub fn get_ref(&self) -> &R { 57 &self.obj 58 } 59 60 /// Acquires a mutable reference to the underlying stream 61 /// 62 /// Note that mutation of the stream may result in surprising results if 63 /// this encoder is continued to be used. get_mut(&mut self) -> &mut R64 pub fn get_mut(&mut self) -> &mut R { 65 &mut self.obj 66 } 67 68 /// Consumes this encoder, returning the underlying reader. into_inner(self) -> R69 pub fn into_inner(self) -> R { 70 self.obj 71 } 72 73 /// Returns the number of bytes produced by the compressor 74 /// (e.g. the number of bytes read from this stream) 75 /// 76 /// Note that, due to buffering, this only bears any relation to 77 /// total_in() when the compressor chooses to flush its data 78 /// (unfortunately, this won't happen this won't happen in general 79 /// at the end of the stream, because the compressor doesn't know 80 /// if there's more data to come). At that point, 81 /// `total_out() / total_in()` would be the compression ratio. total_out(&self) -> u6482 pub fn total_out(&self) -> u64 { 83 self.data.total_out() 84 } 85 86 /// Returns the number of bytes consumed by the compressor 87 /// (e.g. the number of bytes read from the underlying stream) total_in(&self) -> u6488 pub fn total_in(&self) -> u64 { 89 self.data.total_in() 90 } 91 } 92 93 impl<R: BufRead> Read for XzEncoder<R> { read(&mut self, buf: &mut [u8]) -> io::Result<usize>94 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 95 loop { 96 let (read, consumed, eof, ret); 97 { 98 let input = self.obj.fill_buf()?; 99 eof = input.is_empty(); 100 let before_out = self.data.total_out(); 101 let before_in = self.data.total_in(); 102 let action = if eof {Action::Finish} else {Action::Run}; 103 ret = self.data.process(input, buf, action); 104 read = (self.data.total_out() - before_out) as usize; 105 consumed = (self.data.total_in() - before_in) as usize; 106 } 107 self.obj.consume(consumed); 108 109 ret.unwrap(); 110 111 // If we haven't ready any data and we haven't hit EOF yet, then we 112 // need to keep asking for more data because if we return that 0 113 // bytes of data have been read then it will be interpreted as EOF. 114 if read == 0 && !eof && buf.len() > 0 { 115 continue 116 } 117 return Ok(read) 118 } 119 } 120 } 121 122 #[cfg(feature = "tokio")] 123 impl<R: AsyncRead + BufRead> AsyncRead for XzEncoder<R> { 124 } 125 126 impl<W: Write> Write for XzEncoder<W> { write(&mut self, buf: &[u8]) -> io::Result<usize>127 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 128 self.get_mut().write(buf) 129 } 130 flush(&mut self) -> io::Result<()>131 fn flush(&mut self) -> io::Result<()> { 132 self.get_mut().flush() 133 } 134 } 135 136 #[cfg(feature = "tokio")] 137 impl<R: AsyncWrite> AsyncWrite for XzEncoder<R> { shutdown(&mut self) -> Poll<(), io::Error>138 fn shutdown(&mut self) -> Poll<(), io::Error> { 139 self.get_mut().shutdown() 140 } 141 } 142 143 impl<R: BufRead> XzDecoder<R> { 144 /// Creates a new decoder which will decompress data read from the given 145 /// stream. new(r: R) -> XzDecoder<R>146 pub fn new(r: R) -> XzDecoder<R> { 147 let stream = Stream::new_stream_decoder(u64::max_value(), 0).unwrap(); 148 XzDecoder::new_stream(r, stream) 149 } 150 151 /// Creates a new decoder which will decompress data read from the given 152 /// input. All the concatenated xz streams from input will be consumed. new_multi_decoder(r: R) -> XzDecoder<R>153 pub fn new_multi_decoder(r: R) -> XzDecoder<R> { 154 let stream = Stream::new_auto_decoder(u64::max_value(), lzma_sys::LZMA_CONCATENATED).unwrap(); 155 XzDecoder::new_stream(r, stream) 156 } 157 158 /// Creates a new decoder with a custom `Stream`. 159 /// 160 /// The `Stream` can be pre-configured for various checks, different 161 /// decompression options/tuning, etc. new_stream(r: R, stream: Stream) -> XzDecoder<R>162 pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> { 163 XzDecoder { obj: r, data: stream } 164 } 165 } 166 167 impl<R> XzDecoder<R> { 168 /// Acquires a reference to the underlying stream get_ref(&self) -> &R169 pub fn get_ref(&self) -> &R { 170 &self.obj 171 } 172 173 /// Acquires a mutable reference to the underlying stream 174 /// 175 /// Note that mutation of the stream may result in surprising results if 176 /// this encoder is continued to be used. get_mut(&mut self) -> &mut R177 pub fn get_mut(&mut self) -> &mut R { 178 &mut self.obj 179 } 180 181 /// Consumes this decoder, returning the underlying reader. into_inner(self) -> R182 pub fn into_inner(self) -> R { 183 self.obj 184 } 185 186 /// Returns the number of bytes that the decompressor has consumed. 187 /// 188 /// Note that this will likely be smaller than what the decompressor 189 /// actually read from the underlying stream due to buffering. total_in(&self) -> u64190 pub fn total_in(&self) -> u64 { 191 self.data.total_in() 192 } 193 194 /// Returns the number of bytes that the decompressor has produced. total_out(&self) -> u64195 pub fn total_out(&self) -> u64 { 196 self.data.total_out() 197 } 198 } 199 200 impl<R: BufRead> Read for XzDecoder<R> { read(&mut self, buf: &mut [u8]) -> io::Result<usize>201 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 202 loop { 203 let (read, consumed, eof, ret); 204 { 205 let input = self.obj.fill_buf()?; 206 eof = input.is_empty(); 207 let before_out = self.data.total_out(); 208 let before_in = self.data.total_in(); 209 ret = self.data.process(input, buf, if eof { Action::Finish } else { Action::Run }); 210 read = (self.data.total_out() - before_out) as usize; 211 consumed = (self.data.total_in() - before_in) as usize; 212 } 213 self.obj.consume(consumed); 214 215 let status = ret?; 216 if read > 0 || eof || buf.len() == 0 { 217 if read == 0 && status != Status::StreamEnd && buf.len() > 0 { 218 return Err(io::Error::new(io::ErrorKind::Other, 219 "premature eof")) 220 } 221 return Ok(read) 222 } 223 if consumed == 0 { 224 return Err(io::Error::new(io::ErrorKind::Other, 225 "corrupt xz stream")) 226 } 227 } 228 } 229 } 230 231 #[cfg(feature = "tokio")] 232 impl<R: AsyncRead + BufRead> AsyncRead for XzDecoder<R> { 233 } 234 235 impl<W: Write> Write for XzDecoder<W> { write(&mut self, buf: &[u8]) -> io::Result<usize>236 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 237 self.get_mut().write(buf) 238 } 239 flush(&mut self) -> io::Result<()>240 fn flush(&mut self) -> io::Result<()> { 241 self.get_mut().flush() 242 } 243 } 244 245 #[cfg(feature = "tokio")] 246 impl<R: AsyncWrite> AsyncWrite for XzDecoder<R> { shutdown(&mut self) -> Poll<(), io::Error>247 fn shutdown(&mut self) -> Poll<(), io::Error> { 248 self.get_mut().shutdown() 249 } 250 } 251 252 #[cfg(test)] 253 mod tests { 254 use bufread::{XzEncoder, XzDecoder}; 255 use std::io::Read; 256 257 #[test] compressed_and_trailing_data()258 fn compressed_and_trailing_data() { 259 // Make a vector with compressed data... 260 let mut to_compress : Vec<u8> = Vec::new(); 261 const COMPRESSED_ORIG_SIZE: usize = 1024; 262 for num in 0..COMPRESSED_ORIG_SIZE { 263 to_compress.push(num as u8) 264 } 265 let mut encoder = XzEncoder::new(&to_compress[..], 6); 266 267 let mut decoder_input = Vec::new(); 268 encoder.read_to_end(&mut decoder_input).unwrap(); 269 270 // ...plus additional unrelated trailing data 271 const ADDITIONAL_SIZE : usize = 123; 272 let mut additional_data = Vec::new(); 273 for num in 0..ADDITIONAL_SIZE { 274 additional_data.push(((25 + num) % 256) as u8) 275 } 276 decoder_input.extend(&additional_data); 277 278 // Decoder must be able to read the compressed xz stream, and keep the trailing data. 279 let mut decoder_reader = &decoder_input[..]; 280 { 281 let mut decoder = XzDecoder::new(&mut decoder_reader); 282 let mut decompressed_data = vec![0u8; to_compress.len()]; 283 284 assert_eq!(decoder.read(&mut decompressed_data).unwrap(), COMPRESSED_ORIG_SIZE); 285 assert_eq!(decompressed_data, &to_compress[..]); 286 } 287 288 let mut remaining_data = Vec::new(); 289 let nb_read = decoder_reader.read_to_end(&mut remaining_data).unwrap(); 290 assert_eq!(nb_read, ADDITIONAL_SIZE); 291 assert_eq!(remaining_data, &additional_data[..]); 292 } 293 } 294