1 //! Reader-based compression/decompression streams 2 3 use std::io::prelude::*; 4 use std::io::{self, BufReader}; 5 6 #[cfg(feature = "tokio")] 7 use futures::Poll; 8 #[cfg(feature = "tokio")] 9 use tokio_io::{AsyncRead, AsyncWrite}; 10 11 use bufread; 12 use stream::Stream; 13 14 /// A compression stream which wraps an uncompressed stream of data. Compressed 15 /// data will be read from the stream. 16 pub struct XzEncoder<R: Read> { 17 inner: bufread::XzEncoder<BufReader<R>>, 18 } 19 20 /// A decompression stream which wraps a compressed stream of data. Decompressed 21 /// data will be read from the stream. 22 pub struct XzDecoder<R: Read> { 23 inner: bufread::XzDecoder<BufReader<R>>, 24 } 25 26 impl<R: Read> XzEncoder<R> { 27 /// Create a new compression stream which will compress at the given level 28 /// to read compress output to the give output stream. 29 /// 30 /// The `level` argument here is typically 0-9 with 6 being a good default. new(r: R, level: u32) -> XzEncoder<R>31 pub fn new(r: R, level: u32) -> XzEncoder<R> { 32 XzEncoder { 33 inner: bufread::XzEncoder::new(BufReader::new(r), level), 34 } 35 } 36 37 /// Creates a new encoder with a custom `Stream`. 38 /// 39 /// The `Stream` can be pre-configured for multithreaded encoding, different 40 /// compression options/tuning, etc. new_stream(r: R, stream: Stream) -> XzEncoder<R>41 pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> { 42 XzEncoder { 43 inner: bufread::XzEncoder::new_stream(BufReader::new(r), stream), 44 } 45 } 46 47 /// Acquires a reference to the underlying stream get_ref(&self) -> &R48 pub fn get_ref(&self) -> &R { 49 self.inner.get_ref().get_ref() 50 } 51 52 /// Acquires a mutable reference to the underlying stream 53 /// 54 /// Note that mutation of the stream may result in surprising results if 55 /// this encoder is continued to be used. get_mut(&mut self) -> &mut R56 pub fn get_mut(&mut self) -> &mut R { 57 self.inner.get_mut().get_mut() 58 } 59 60 /// Unwrap the underlying writer, finishing the compression stream. into_inner(self) -> R61 pub fn into_inner(self) -> R { 62 self.inner.into_inner().into_inner() 63 } 64 65 /// Returns the number of bytes produced by the compressor 66 /// (e.g. the number of bytes read from this stream) 67 /// 68 /// Note that, due to buffering, this only bears any relation to 69 /// total_in() when the compressor chooses to flush its data 70 /// (unfortunately, this won't happen this won't happen in general 71 /// at the end of the stream, because the compressor doesn't know 72 /// if there's more data to come). At that point, 73 /// `total_out() / total_in()` would be the compression ratio. total_out(&self) -> u6474 pub fn total_out(&self) -> u64 { 75 self.inner.total_out() 76 } 77 78 /// Returns the number of bytes consumed by the compressor 79 /// (e.g. the number of bytes read from the underlying stream) total_in(&self) -> u6480 pub fn total_in(&self) -> u64 { 81 self.inner.total_in() 82 } 83 } 84 85 impl<R: Read> Read for XzEncoder<R> { read(&mut self, buf: &mut [u8]) -> io::Result<usize>86 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 87 self.inner.read(buf) 88 } 89 } 90 91 #[cfg(feature = "tokio")] 92 impl<R: AsyncRead> AsyncRead for XzEncoder<R> { 93 } 94 95 impl<W: Write + Read> Write for XzEncoder<W> { write(&mut self, buf: &[u8]) -> io::Result<usize>96 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 97 self.get_mut().write(buf) 98 } 99 flush(&mut self) -> io::Result<()>100 fn flush(&mut self) -> io::Result<()> { 101 self.get_mut().flush() 102 } 103 } 104 105 #[cfg(feature = "tokio")] 106 impl<R: AsyncWrite + Read> AsyncWrite for XzEncoder<R> { shutdown(&mut self) -> Poll<(), io::Error>107 fn shutdown(&mut self) -> Poll<(), io::Error> { 108 self.get_mut().shutdown() 109 } 110 } 111 112 impl<R: Read> XzDecoder<R> { 113 /// Create a new decompression stream, which will read compressed 114 /// data from the given input stream, and decompress one xz stream. 115 /// It may also consume input data that follows the xz stream. 116 /// Use [`xz::bufread::XzDecoder`] instead to process a mix of xz and non-xz data. new(r: R) -> XzDecoder<R>117 pub fn new(r: R) -> XzDecoder<R> { 118 XzDecoder { 119 inner: bufread::XzDecoder::new(BufReader::new(r)), 120 } 121 } 122 123 /// Create a new decompression stream, which will read compressed 124 /// data from the given input and decompress all the xz stream it contains. new_multi_decoder(r: R) -> XzDecoder<R>125 pub fn new_multi_decoder(r: R) -> XzDecoder<R> { 126 XzDecoder { 127 inner: bufread::XzDecoder::new_multi_decoder(BufReader::new(r)), 128 } 129 } 130 131 /// Creates a new decoder with a custom `Stream`. 132 /// 133 /// The `Stream` can be pre-configured for various checks, different 134 /// decompression options/tuning, etc. new_stream(r: R, stream: Stream) -> XzDecoder<R>135 pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> { 136 XzDecoder { 137 inner: bufread::XzDecoder::new_stream(BufReader::new(r), stream), 138 } 139 } 140 141 /// Acquires a reference to the underlying stream get_ref(&self) -> &R142 pub fn get_ref(&self) -> &R { 143 self.inner.get_ref().get_ref() 144 } 145 146 /// Acquires a mutable reference to the underlying stream 147 /// 148 /// Note that mutation of the stream may result in surprising results if 149 /// this encoder is continued to be used. get_mut(&mut self) -> &mut R150 pub fn get_mut(&mut self) -> &mut R { 151 self.inner.get_mut().get_mut() 152 } 153 154 /// Unwrap the underlying writer, finishing the compression stream. into_inner(self) -> R155 pub fn into_inner(self) -> R { 156 self.inner.into_inner().into_inner() 157 } 158 159 /// Returns the number of bytes produced by the decompressor 160 /// (e.g. the number of bytes read from this stream) 161 /// 162 /// Note that, due to buffering, this only bears any relation to 163 /// total_in() when the decompressor reaches a sync point 164 /// (e.g. where the original compressed stream was flushed). 165 /// At that point, `total_in() / total_out()` is the compression ratio. total_out(&self) -> u64166 pub fn total_out(&self) -> u64 { 167 self.inner.total_out() 168 } 169 170 /// Returns the number of bytes consumed by the decompressor 171 /// (e.g. the number of bytes read from the underlying stream) total_in(&self) -> u64172 pub fn total_in(&self) -> u64 { 173 self.inner.total_in() 174 } 175 } 176 177 impl<R: Read> Read for XzDecoder<R> { read(&mut self, buf: &mut [u8]) -> io::Result<usize>178 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 179 self.inner.read(buf) 180 } 181 } 182 183 #[cfg(feature = "tokio")] 184 impl<R: AsyncRead + Read> AsyncRead for XzDecoder<R> { 185 } 186 187 impl<W: Write + Read> Write for XzDecoder<W> { write(&mut self, buf: &[u8]) -> io::Result<usize>188 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 189 self.get_mut().write(buf) 190 } 191 flush(&mut self) -> io::Result<()>192 fn flush(&mut self) -> io::Result<()> { 193 self.get_mut().flush() 194 } 195 } 196 197 #[cfg(feature = "tokio")] 198 impl<R: AsyncWrite + Read> AsyncWrite for XzDecoder<R> { shutdown(&mut self) -> Poll<(), io::Error>199 fn shutdown(&mut self) -> Poll<(), io::Error> { 200 self.get_mut().shutdown() 201 } 202 } 203 204 #[cfg(test)] 205 mod tests { 206 use std::io::prelude::*; 207 use read::{XzEncoder, XzDecoder}; 208 use rand::{thread_rng, Rng}; 209 210 #[test] smoke()211 fn smoke() { 212 let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8]; 213 let mut c = XzEncoder::new(m, 6); 214 let mut data = vec![]; 215 c.read_to_end(&mut data).unwrap(); 216 let mut d = XzDecoder::new(&data[..]); 217 let mut data2 = Vec::new(); 218 d.read_to_end(&mut data2).unwrap(); 219 assert_eq!(data2, m); 220 } 221 222 #[test] smoke2()223 fn smoke2() { 224 let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8]; 225 let c = XzEncoder::new(m, 6); 226 let mut d = XzDecoder::new(c); 227 let mut data = vec![]; 228 d.read_to_end(&mut data).unwrap(); 229 assert_eq!(data, [1, 2, 3, 4, 5, 6, 7, 8]); 230 } 231 232 #[test] smoke3()233 fn smoke3() { 234 let m = vec![3u8; 128 * 1024 + 1]; 235 let c = XzEncoder::new(&m[..], 6); 236 let mut d = XzDecoder::new(c); 237 let mut data = vec![]; 238 d.read_to_end(&mut data).unwrap(); 239 assert!(data == &m[..]); 240 } 241 242 #[test] self_terminating()243 fn self_terminating() { 244 let m = vec![3u8; 128 * 1024 + 1]; 245 let mut c = XzEncoder::new(&m[..], 6); 246 247 let mut result = Vec::new(); 248 c.read_to_end(&mut result).unwrap(); 249 250 let v = thread_rng().gen_iter::<u8>().take(1024).collect::<Vec<_>>(); 251 for _ in 0..200 { 252 result.extend(v.iter().map(|x| *x)); 253 } 254 255 let mut d = XzDecoder::new(&result[..]); 256 let mut data = Vec::with_capacity(m.len()); 257 unsafe { data.set_len(m.len()); } 258 assert!(d.read(&mut data).unwrap() == m.len()); 259 assert!(data == &m[..]); 260 } 261 262 #[test] zero_length_read_at_eof()263 fn zero_length_read_at_eof() { 264 let m = Vec::new(); 265 let mut c = XzEncoder::new(&m[..], 6); 266 267 let mut result = Vec::new(); 268 c.read_to_end(&mut result).unwrap(); 269 270 let mut d = XzDecoder::new(&result[..]); 271 let mut data = Vec::new(); 272 assert!(d.read(&mut data).unwrap() == 0); 273 } 274 275 #[test] zero_length_read_with_data()276 fn zero_length_read_with_data() { 277 let m = vec![3u8; 128 * 1024 + 1]; 278 let mut c = XzEncoder::new(&m[..], 6); 279 280 let mut result = Vec::new(); 281 c.read_to_end(&mut result).unwrap(); 282 283 let mut d = XzDecoder::new(&result[..]); 284 let mut data = Vec::new(); 285 assert!(d.read(&mut data).unwrap() == 0); 286 } 287 288 #[test] qc()289 fn qc() { 290 ::quickcheck::quickcheck(test as fn(_) -> _); 291 292 fn test(v: Vec<u8>) -> bool { 293 let r = XzEncoder::new(&v[..], 6); 294 let mut r = XzDecoder::new(r); 295 let mut v2 = Vec::new(); 296 r.read_to_end(&mut v2).unwrap(); 297 v == v2 298 } 299 } 300 301 #[test] two_streams()302 fn two_streams() { 303 let mut input_stream1: Vec<u8> = Vec::new(); 304 let mut input_stream2: Vec<u8> = Vec::new(); 305 let mut all_input : Vec<u8> = Vec::new(); 306 307 // Generate input data. 308 const STREAM1_SIZE: usize = 1024; 309 for num in 0..STREAM1_SIZE { 310 input_stream1.push(num as u8) 311 } 312 const STREAM2_SIZE: usize = 532; 313 for num in 0..STREAM2_SIZE { 314 input_stream2.push((num + 32) as u8) 315 } 316 all_input.extend(&input_stream1); 317 all_input.extend(&input_stream2); 318 319 // Make a vector with compressed data 320 let mut decoder_input = Vec::new(); 321 { 322 let mut encoder = XzEncoder::new(&input_stream1[..], 6); 323 encoder.read_to_end(&mut decoder_input).unwrap(); 324 } 325 { 326 let mut encoder = XzEncoder::new(&input_stream2[..], 6); 327 encoder.read_to_end(&mut decoder_input).unwrap(); 328 } 329 330 // Decoder must be able to read the 2 concatenated xz streams and get the same data as input. 331 let mut decoder_reader = &decoder_input[..]; 332 { 333 // using `XzDecoder::new` here would fail because only 1 xz stream would be processed. 334 let mut decoder = XzDecoder::new_multi_decoder(&mut decoder_reader); 335 let mut decompressed_data = vec![0u8; all_input.len()]; 336 337 assert_eq!(decoder.read(&mut decompressed_data).unwrap(), all_input.len()); 338 assert_eq!(decompressed_data, &all_input[..]); 339 } 340 } 341 } 342