1 //! Writer-based compression/decompression streams 2 3 use std::io::prelude::*; 4 use std::io; 5 6 use raw::{self, Decompress, DeStatus, Compress, CompressOp, CoStatus}; 7 8 use super::CompressParams; 9 10 const BUF_SIZE: usize = 32 * 1024; 11 12 /// A compression stream which will have uncompressed data written to it and 13 /// will write compressed data to an output stream. 14 pub struct BrotliEncoder<W: Write> { 15 data: Compress, 16 obj: Option<W>, 17 buf: Vec<u8>, 18 cur: usize, 19 err: Option<raw::Error>, 20 } 21 22 /// A compression stream which will have compressed data written to it and 23 /// will write uncompressed data to an output stream. 24 pub struct BrotliDecoder<W: Write> { 25 data: Decompress, 26 obj: Option<W>, 27 buf: Vec<u8>, 28 cur: usize, 29 err: Option<raw::Error>, 30 } 31 32 impl<W: Write> BrotliEncoder<W> { 33 /// Create a new compression stream which will compress at the given level 34 /// to write compress output to the give output stream. new(obj: W, level: u32) -> BrotliEncoder<W>35 pub fn new(obj: W, level: u32) -> BrotliEncoder<W> { 36 let mut data = Compress::new(); 37 data.set_params(CompressParams::new().quality(level)); 38 BrotliEncoder { 39 data: data, 40 obj: Some(obj), 41 buf: Vec::with_capacity(BUF_SIZE), 42 cur: 0, 43 err: None, 44 } 45 } 46 47 /// Creates a new encoder with a custom `CompressParams`. from_params(obj: W, params: &CompressParams) -> BrotliEncoder<W>48 pub fn from_params(obj: W, params: &CompressParams) -> BrotliEncoder<W> { 49 let mut data = Compress::new(); 50 data.set_params(params); 51 BrotliEncoder { 52 data: data, 53 obj: Some(obj), 54 buf: Vec::with_capacity(BUF_SIZE), 55 cur: 0, 56 err: None, 57 } 58 } 59 60 /// Acquires a reference to the underlying writer. get_ref(&self) -> &W61 pub fn get_ref(&self) -> &W { 62 self.obj.as_ref().unwrap() 63 } 64 65 /// Acquires a mutable reference to the underlying writer. 66 /// 67 /// Note that mutating the output/input state of the stream may corrupt this 68 /// object, so care must be taken when using this method. get_mut(&mut self) -> &mut W69 pub fn get_mut(&mut self) -> &mut W { 70 self.obj.as_mut().unwrap() 71 } 72 dump(&mut self) -> io::Result<()>73 fn dump(&mut self) -> io::Result<()> { 74 loop { 75 while !self.buf.is_empty() { 76 let amt = try!(self.obj.as_mut().unwrap().write(&self.buf[self.cur..])); 77 self.cur += amt; 78 if self.cur == self.buf.len() { 79 self.buf.clear(); 80 self.cur = 0 81 } 82 } 83 // TODO: if we could peek, the buffer wouldn't be necessary 84 if let Some(data) = self.data.take_output(Some(BUF_SIZE)) { 85 match self.obj.as_mut().unwrap().write(data) { 86 Ok(n) => self.buf.extend_from_slice(&data[n..]), 87 Err(e) => { 88 self.buf.extend_from_slice(data); 89 return Err(e) 90 } 91 } 92 } else { 93 break 94 } 95 } 96 Ok(()) 97 } 98 99 // Flush or finish stream, also flushing underlying stream do_flush_or_finish(&mut self, finish: bool) -> io::Result<()>100 fn do_flush_or_finish(&mut self, finish: bool) -> io::Result<()> { 101 try!(self.dump()); 102 let op = if finish { CompressOp::Finish } else { CompressOp::Flush }; 103 loop { 104 let status = match self.data.compress(op, &mut &[][..], &mut &mut [][..]) { 105 Ok(s) => s, 106 Err(err) => { 107 self.err = Some(err.clone()); 108 return Err(err.into()) 109 }, 110 }; 111 let obj = self.obj.as_mut().unwrap(); 112 while let Some(data) = self.data.take_output(None) { 113 try!(obj.write_all(data)) 114 } 115 match status { 116 CoStatus::Finished => { 117 try!(obj.flush()); 118 return Ok(()) 119 }, 120 CoStatus::Unfinished => (), 121 } 122 } 123 } 124 125 /// Consumes this encoder, flushing the output stream. 126 /// 127 /// This will flush the underlying data stream and then return the contained 128 /// writer if the flush succeeded. finish(mut self) -> io::Result<W>129 pub fn finish(mut self) -> io::Result<W> { 130 try!(self.do_flush_or_finish(true)); 131 Ok(self.obj.take().unwrap()) 132 } 133 } 134 135 impl<W: Write> Write for BrotliEncoder<W> { write(&mut self, mut data: &[u8]) -> io::Result<usize>136 fn write(&mut self, mut data: &[u8]) -> io::Result<usize> { 137 if data.is_empty() { return Ok(0) } 138 // If the decompressor has failed at some point, this is set. 139 // Unfortunately we have no idea what status is in the compressor 140 // was in when it failed so we can't do anything except bail again. 141 if let Some(ref err) = self.err { 142 return Err(err.clone().into()) 143 } 144 try!(self.dump()); 145 // Zero-length output buf to keep it all inside the compressor buffer 146 let avail_in = data.len(); 147 if let Err(err) = self.data.compress(CompressOp::Process, &mut data, &mut &mut [][..]) { 148 self.err = Some(err.clone()); 149 return Err(err.into()) 150 } 151 assert!(avail_in != data.len()); 152 Ok(avail_in - data.len()) 153 } 154 flush(&mut self) -> io::Result<()>155 fn flush(&mut self) -> io::Result<()> { 156 self.do_flush_or_finish(false) 157 } 158 } 159 160 impl<W: Write> Drop for BrotliEncoder<W> { drop(&mut self)161 fn drop(&mut self) { 162 if self.obj.is_some() { 163 let _ = self.do_flush_or_finish(true); 164 } 165 } 166 } 167 168 impl<W: Write> BrotliDecoder<W> { 169 /// Creates a new decoding stream which will decode all input written to it 170 /// into `obj`. new(obj: W) -> BrotliDecoder<W>171 pub fn new(obj: W) -> BrotliDecoder<W> { 172 BrotliDecoder { 173 data: Decompress::new(), 174 obj: Some(obj), 175 buf: Vec::with_capacity(BUF_SIZE), 176 cur: 0, 177 err: None, 178 } 179 } 180 181 /// Acquires a reference to the underlying writer. get_ref(&self) -> &W182 pub fn get_ref(&self) -> &W { 183 self.obj.as_ref().unwrap() 184 } 185 186 /// Acquires a mutable reference to the underlying writer. 187 /// 188 /// Note that mutating the output/input state of the stream may corrupt this 189 /// object, so care must be taken when using this method. get_mut(&mut self) -> &mut W190 pub fn get_mut(&mut self) -> &mut W { 191 self.obj.as_mut().unwrap() 192 } 193 dump(&mut self) -> io::Result<()>194 fn dump(&mut self) -> io::Result<()> { 195 loop { 196 while !self.buf.is_empty() { 197 let amt = try!(self.obj.as_mut().unwrap().write(&self.buf[self.cur..])); 198 self.cur += amt; 199 if self.cur == self.buf.len() { 200 self.buf.clear(); 201 self.cur = 0 202 } 203 } 204 // TODO: if we could peek, the buffer wouldn't be necessary 205 if let Some(data) = self.data.take_output(Some(BUF_SIZE)) { 206 self.buf.extend_from_slice(data) 207 } else { 208 break 209 } 210 } 211 Ok(()) 212 } 213 do_finish(&mut self) -> io::Result<()>214 fn do_finish(&mut self) -> io::Result<()> { 215 try!(self.dump()); 216 loop { 217 let status = match self.data.decompress(&mut &[][..], &mut &mut [][..]) { 218 Ok(s) => s, 219 Err(err) => { 220 self.err = Some(err.clone()); 221 return Err(err.into()) 222 }, 223 }; 224 let obj = self.obj.as_mut().unwrap(); 225 while let Some(data) = self.data.take_output(None) { 226 try!(obj.write_all(data)) 227 } 228 match status { 229 DeStatus::Finished => { 230 try!(obj.flush()); 231 return Ok(()) 232 }, 233 // When decoding a truncated file, brotli returns DeStatus::NeedInput. 234 // Since we're finishing, we cannot provide more data so this is an 235 // error. 236 DeStatus::NeedInput => { 237 let msg = "brotli compressed stream is truncated or otherwise corrupt"; 238 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg)) 239 }, 240 DeStatus::NeedOutput => (), 241 } 242 } 243 } 244 245 /// Unwrap the underlying writer, finishing the compression stream. finish(&mut self) -> io::Result<W>246 pub fn finish(&mut self) -> io::Result<W> { 247 try!(self.do_finish()); 248 Ok(self.obj.take().unwrap()) 249 } 250 } 251 252 impl<W: Write> Write for BrotliDecoder<W> { write(&mut self, mut data: &[u8]) -> io::Result<usize>253 fn write(&mut self, mut data: &[u8]) -> io::Result<usize> { 254 if data.is_empty() { return Ok(0) } 255 // If the decompressor has failed at some point, this is set. 256 // Unfortunately we have no idea what status is in the compressor 257 // was in when it failed so we can't do anything except bail again. 258 if let Some(ref err) = self.err { 259 return Err(err.clone().into()) 260 } 261 try!(self.dump()); 262 // Zero-length output buf to keep it all inside the decompressor buffer 263 let avail_in = data.len(); 264 let status = match self.data.decompress(&mut data, &mut &mut [][..]) { 265 Ok(s) => s, 266 Err(err) => { 267 self.err = Some(err.clone()); 268 return Err(err.into()) 269 }, 270 }; 271 assert!(avail_in != data.len() || status == DeStatus::Finished); 272 Ok(avail_in - data.len()) 273 } 274 flush(&mut self) -> io::Result<()>275 fn flush(&mut self) -> io::Result<()> { 276 try!(self.dump()); 277 self.obj.as_mut().unwrap().flush() 278 } 279 } 280 281 impl<W: Write> Drop for BrotliDecoder<W> { drop(&mut self)282 fn drop(&mut self) { 283 if self.obj.is_some() { 284 let _ = self.do_finish(); 285 } 286 } 287 } 288 289 #[cfg(test)] 290 mod tests { 291 use std::io::prelude::*; 292 use std::iter::repeat; 293 use super::{BrotliEncoder, BrotliDecoder}; 294 295 #[test] smoke()296 fn smoke() { 297 let d = BrotliDecoder::new(Vec::new()); 298 let mut c = BrotliEncoder::new(d, 6); 299 c.write_all(b"12834").unwrap(); 300 let s = repeat("12345").take(100000).collect::<String>(); 301 c.write_all(s.as_bytes()).unwrap(); 302 let data = c.finish().unwrap().finish().unwrap(); 303 assert_eq!(&data[0..5], b"12834"); 304 assert_eq!(data.len(), 500005); 305 assert!(format!("12834{}", s).as_bytes() == &*data); 306 } 307 308 #[test] write_empty()309 fn write_empty() { 310 let d = BrotliDecoder::new(Vec::new()); 311 let mut c = BrotliEncoder::new(d, 6); 312 c.write(b"").unwrap(); 313 let data = c.finish().unwrap().finish().unwrap(); 314 assert_eq!(&data[..], b""); 315 } 316 317 #[test] qc()318 fn qc() { 319 ::quickcheck::quickcheck(test as fn(_) -> _); 320 321 fn test(v: Vec<u8>) -> bool { 322 let w = BrotliDecoder::new(Vec::new()); 323 let mut w = BrotliEncoder::new(w, 6); 324 w.write_all(&v).unwrap(); 325 v == w.finish().unwrap().finish().unwrap() 326 } 327 } 328 } 329