1 //! Reader-based compression/decompression streams 2 3 use std::io::prelude::*; 4 use std::io::{self, BufReader}; 5 6 #[cfg(feature = "tokio")] 7 use futures::Poll; 8 #[cfg(feature = "tokio")] 9 use tokio_io::{AsyncRead, AsyncWrite}; 10 11 use bufread; 12 use Compression; 13 14 /// A compression stream which wraps an uncompressed stream of data. Compressed 15 /// data will be read from the stream. 16 pub struct BzEncoder<R> { 17 inner: bufread::BzEncoder<BufReader<R>>, 18 } 19 20 /// A decompression stream which wraps a compressed stream of data. Decompressed 21 /// data will be read from the stream. 22 pub struct BzDecoder<R> { 23 inner: bufread::BzDecoder<BufReader<R>>, 24 } 25 26 impl<R: Read> BzEncoder<R> { 27 /// Create a new compression stream which will compress at the given level 28 /// to read compress output to the give output stream. new(r: R, level: Compression) -> BzEncoder<R>29 pub fn new(r: R, level: Compression) -> BzEncoder<R> { 30 BzEncoder { 31 inner: bufread::BzEncoder::new(BufReader::new(r), level), 32 } 33 } 34 35 /// Acquires a reference to the underlying stream get_ref(&self) -> &R36 pub fn get_ref(&self) -> &R { 37 self.inner.get_ref().get_ref() 38 } 39 40 /// Acquires a mutable reference to the underlying stream 41 /// 42 /// Note that mutation of the stream may result in surprising results if 43 /// this encoder is continued to be used. get_mut(&mut self) -> &mut R44 pub fn get_mut(&mut self) -> &mut R { 45 self.inner.get_mut().get_mut() 46 } 47 48 /// Unwrap the underlying writer, finishing the compression stream. into_inner(self) -> R49 pub fn into_inner(self) -> R { 50 self.inner.into_inner().into_inner() 51 } 52 53 /// Returns the number of bytes produced by the compressor 54 /// (e.g. the number of bytes read from this stream) 55 /// 56 /// Note that, due to buffering, this only bears any relation to 57 /// total_in() when the compressor chooses to flush its data 58 /// (unfortunately, this won't happen in general 59 /// at the end of the stream, because the compressor doesn't know 60 /// if there's more data to come). At that point, 61 /// `total_out() / total_in()` would be the compression ratio. total_out(&self) -> u6462 pub fn total_out(&self) -> u64 { 63 self.inner.total_out() 64 } 65 66 /// Returns the number of bytes consumed by the compressor 67 /// (e.g. the number of bytes read from the underlying stream) total_in(&self) -> u6468 pub fn total_in(&self) -> u64 { 69 self.inner.total_in() 70 } 71 } 72 73 impl<R: Read> Read for BzEncoder<R> { read(&mut self, buf: &mut [u8]) -> io::Result<usize>74 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 75 self.inner.read(buf) 76 } 77 } 78 79 #[cfg(feature = "tokio")] 80 impl<R: AsyncRead> AsyncRead for BzEncoder<R> {} 81 82 impl<W: Write + Read> Write for BzEncoder<W> { write(&mut self, buf: &[u8]) -> io::Result<usize>83 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 84 self.get_mut().write(buf) 85 } 86 flush(&mut self) -> io::Result<()>87 fn flush(&mut self) -> io::Result<()> { 88 self.get_mut().flush() 89 } 90 } 91 92 #[cfg(feature = "tokio")] 93 impl<R: AsyncWrite + Read> AsyncWrite for BzEncoder<R> { shutdown(&mut self) -> Poll<(), io::Error>94 fn shutdown(&mut self) -> Poll<(), io::Error> { 95 self.get_mut().shutdown() 96 } 97 } 98 99 impl<R: Read> BzDecoder<R> { 100 /// Create a new decompression stream, which will read compressed 101 /// data from the given input stream and decompress it. new(r: R) -> BzDecoder<R>102 pub fn new(r: R) -> BzDecoder<R> { 103 BzDecoder { 104 inner: bufread::BzDecoder::new(BufReader::new(r)), 105 } 106 } 107 108 /// Acquires a reference to the underlying stream get_ref(&self) -> &R109 pub fn get_ref(&self) -> &R { 110 self.inner.get_ref().get_ref() 111 } 112 113 /// Acquires a mutable reference to the underlying stream 114 /// 115 /// Note that mutation of the stream may result in surprising results if 116 /// this encoder is continued to be used. get_mut(&mut self) -> &mut R117 pub fn get_mut(&mut self) -> &mut R { 118 self.inner.get_mut().get_mut() 119 } 120 121 /// Unwrap the underlying writer, finishing the compression stream. into_inner(self) -> R122 pub fn into_inner(self) -> R { 123 self.inner.into_inner().into_inner() 124 } 125 126 /// Returns the number of bytes produced by the decompressor 127 /// (e.g. the number of bytes read from this stream) 128 /// 129 /// Note that, due to buffering, this only bears any relation to 130 /// total_in() when the decompressor reaches a sync point 131 /// (e.g. where the original compressed stream was flushed). 132 /// At that point, `total_in() / total_out()` is the compression ratio. total_out(&self) -> u64133 pub fn total_out(&self) -> u64 { 134 self.inner.total_out() 135 } 136 137 /// Returns the number of bytes consumed by the decompressor 138 /// (e.g. the number of bytes read from the underlying stream) total_in(&self) -> u64139 pub fn total_in(&self) -> u64 { 140 self.inner.total_in() 141 } 142 } 143 144 impl<R: Read> Read for BzDecoder<R> { read(&mut self, into: &mut [u8]) -> io::Result<usize>145 fn read(&mut self, into: &mut [u8]) -> io::Result<usize> { 146 self.inner.read(into) 147 } 148 } 149 150 #[cfg(feature = "tokio")] 151 impl<R: AsyncRead + Read> AsyncRead for BzDecoder<R> {} 152 153 impl<W: Write + Read> Write for BzDecoder<W> { write(&mut self, buf: &[u8]) -> io::Result<usize>154 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 155 self.get_mut().write(buf) 156 } 157 flush(&mut self) -> io::Result<()>158 fn flush(&mut self) -> io::Result<()> { 159 self.get_mut().flush() 160 } 161 } 162 163 #[cfg(feature = "tokio")] 164 impl<R: AsyncWrite + Read> AsyncWrite for BzDecoder<R> { shutdown(&mut self) -> Poll<(), io::Error>165 fn shutdown(&mut self) -> Poll<(), io::Error> { 166 self.get_mut().shutdown() 167 } 168 } 169 170 /// A bzip2 streaming decoder that decodes all members of a multistream 171 /// 172 /// Wikipedia, particularly, uses bzip2 multistream for their dumps. 173 pub struct MultiBzDecoder<R> { 174 inner: bufread::MultiBzDecoder<BufReader<R>>, 175 } 176 177 impl<R: Read> MultiBzDecoder<R> { 178 /// Creates a new decoder from the given reader, immediately parsing the 179 /// (first) gzip header. If the gzip stream contains multiple members all will 180 /// be decoded. new(r: R) -> MultiBzDecoder<R>181 pub fn new(r: R) -> MultiBzDecoder<R> { 182 MultiBzDecoder { 183 inner: bufread::MultiBzDecoder::new(BufReader::new(r)), 184 } 185 } 186 } 187 188 impl<R> MultiBzDecoder<R> { 189 /// Acquires a reference to the underlying reader. get_ref(&self) -> &R190 pub fn get_ref(&self) -> &R { 191 self.inner.get_ref().get_ref() 192 } 193 194 /// Acquires a mutable reference to the underlying stream. 195 /// 196 /// Note that mutation of the stream may result in surprising results if 197 /// this encoder is continued to be used. get_mut(&mut self) -> &mut R198 pub fn get_mut(&mut self) -> &mut R { 199 self.inner.get_mut().get_mut() 200 } 201 202 /// Consumes this decoder, returning the underlying reader. into_inner(self) -> R203 pub fn into_inner(self) -> R { 204 self.inner.into_inner().into_inner() 205 } 206 } 207 208 impl<R: Read> Read for MultiBzDecoder<R> { read(&mut self, into: &mut [u8]) -> io::Result<usize>209 fn read(&mut self, into: &mut [u8]) -> io::Result<usize> { 210 self.inner.read(into) 211 } 212 } 213 214 #[cfg(feature = "tokio")] 215 impl<R: AsyncRead> AsyncRead for MultiBzDecoder<R> {} 216 217 impl<R: Read + Write> Write for MultiBzDecoder<R> { write(&mut self, buf: &[u8]) -> io::Result<usize>218 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 219 self.get_mut().write(buf) 220 } 221 flush(&mut self) -> io::Result<()>222 fn flush(&mut self) -> io::Result<()> { 223 self.get_mut().flush() 224 } 225 } 226 227 #[cfg(feature = "tokio")] 228 impl<R: AsyncWrite + AsyncRead> AsyncWrite for MultiBzDecoder<R> { shutdown(&mut self) -> Poll<(), io::Error>229 fn shutdown(&mut self) -> Poll<(), io::Error> { 230 self.get_mut().shutdown() 231 } 232 } 233 234 #[cfg(test)] 235 mod tests { 236 use partial_io::{GenInterrupted, PartialRead, PartialWithErrors}; 237 use rand::distributions::Standard; 238 use rand::{thread_rng, Rng}; 239 use read::{BzDecoder, BzEncoder, MultiBzDecoder}; 240 use std::io::prelude::*; 241 use Compression; 242 243 #[test] smoke()244 fn smoke() { 245 let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8]; 246 let mut c = BzEncoder::new(m, Compression::default()); 247 let mut data = vec![]; 248 c.read_to_end(&mut data).unwrap(); 249 let mut d = BzDecoder::new(&data[..]); 250 let mut data2 = Vec::new(); 251 d.read_to_end(&mut data2).unwrap(); 252 assert_eq!(data2, m); 253 } 254 255 #[test] smoke2()256 fn smoke2() { 257 let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8]; 258 let c = BzEncoder::new(m, Compression::default()); 259 let mut d = BzDecoder::new(c); 260 let mut data = vec![]; 261 d.read_to_end(&mut data).unwrap(); 262 assert_eq!(data, [1, 2, 3, 4, 5, 6, 7, 8]); 263 } 264 265 #[test] smoke3()266 fn smoke3() { 267 let m = vec![3u8; 128 * 1024 + 1]; 268 let c = BzEncoder::new(&m[..], Compression::default()); 269 let mut d = BzDecoder::new(c); 270 let mut data = vec![]; 271 d.read_to_end(&mut data).unwrap(); 272 assert!(data == &m[..]); 273 } 274 275 #[test] self_terminating()276 fn self_terminating() { 277 let m = vec![3u8; 128 * 1024 + 1]; 278 let mut c = BzEncoder::new(&m[..], Compression::default()); 279 280 let mut result = Vec::new(); 281 c.read_to_end(&mut result).unwrap(); 282 283 let v = thread_rng() 284 .sample_iter(&Standard) 285 .take(1024) 286 .collect::<Vec<_>>(); 287 for _ in 0..200 { 288 result.extend(v.iter().map(|x: &u8| *x)); 289 } 290 291 let mut d = BzDecoder::new(&result[..]); 292 let mut data = Vec::with_capacity(m.len()); 293 unsafe { 294 data.set_len(m.len()); 295 } 296 assert!(d.read(&mut data).unwrap() == m.len()); 297 assert!(data == &m[..]); 298 } 299 300 #[test] zero_length_read_at_eof()301 fn zero_length_read_at_eof() { 302 let m = Vec::new(); 303 let mut c = BzEncoder::new(&m[..], Compression::default()); 304 305 let mut result = Vec::new(); 306 c.read_to_end(&mut result).unwrap(); 307 308 let mut d = BzDecoder::new(&result[..]); 309 let mut data = Vec::new(); 310 assert!(d.read(&mut data).unwrap() == 0); 311 } 312 313 #[test] zero_length_read_with_data()314 fn zero_length_read_with_data() { 315 let m = vec![3u8; 128 * 1024 + 1]; 316 let mut c = BzEncoder::new(&m[..], Compression::default()); 317 318 let mut result = Vec::new(); 319 c.read_to_end(&mut result).unwrap(); 320 321 let mut d = BzDecoder::new(&result[..]); 322 let mut data = Vec::new(); 323 assert!(d.read(&mut data).unwrap() == 0); 324 } 325 326 #[test] multistream_read_till_eof()327 fn multistream_read_till_eof() { 328 let m = vec![3u8; 128 * 1024 + 1]; 329 let repeat = 3; 330 let mut result = Vec::new(); 331 332 for _i in 0..repeat { 333 let mut c = BzEncoder::new(&m[..], Compression::default()); 334 c.read_to_end(&mut result).unwrap(); 335 } 336 337 let mut d = MultiBzDecoder::new(&result[..]); 338 let mut data = Vec::new(); 339 340 let a = d.read_to_end(&mut data).unwrap(); 341 let b = m.len() * repeat; 342 assert!(a == b, "{} {}", a, b); 343 } 344 345 #[test] empty()346 fn empty() { 347 let r = BzEncoder::new(&[][..], Compression::default()); 348 let mut r = BzDecoder::new(r); 349 let mut v2 = Vec::new(); 350 r.read_to_end(&mut v2).unwrap(); 351 assert!(v2.len() == 0); 352 } 353 354 #[test] qc()355 fn qc() { 356 ::quickcheck::quickcheck(test as fn(_) -> _); 357 358 fn test(v: Vec<u8>) -> bool { 359 let r = BzEncoder::new(&v[..], Compression::default()); 360 let mut r = BzDecoder::new(r); 361 let mut v2 = Vec::new(); 362 r.read_to_end(&mut v2).unwrap(); 363 v == v2 364 } 365 } 366 367 #[test] qc_partial()368 fn qc_partial() { 369 quickcheck6::quickcheck(test as fn(_, _, _) -> _); 370 371 fn test( 372 v: Vec<u8>, 373 encode_ops: PartialWithErrors<GenInterrupted>, 374 decode_ops: PartialWithErrors<GenInterrupted>, 375 ) -> bool { 376 let r = BzEncoder::new(PartialRead::new(&v[..], encode_ops), Compression::default()); 377 let mut r = BzDecoder::new(PartialRead::new(r, decode_ops)); 378 let mut v2 = Vec::new(); 379 r.read_to_end(&mut v2).unwrap(); 380 v == v2 381 } 382 } 383 } 384