1 //! Writer-based compression/decompression streams
2 
3 use std::io::prelude::*;
4 use std::io;
5 
6 use raw::{self, Decompress, DeStatus, Compress, CompressOp, CoStatus};
7 
8 use super::CompressParams;
9 
10 const BUF_SIZE: usize = 32 * 1024;
11 
12 /// A compression stream which will have uncompressed data written to it and
13 /// will write compressed data to an output stream.
14 pub struct BrotliEncoder<W: Write> {
15     data: Compress,
16     obj: Option<W>,
17     buf: Vec<u8>,
18     cur: usize,
19     err: Option<raw::Error>,
20 }
21 
22 /// A compression stream which will have compressed data written to it and
23 /// will write uncompressed data to an output stream.
24 pub struct BrotliDecoder<W: Write> {
25     data: Decompress,
26     obj: Option<W>,
27     buf: Vec<u8>,
28     cur: usize,
29     err: Option<raw::Error>,
30 }
31 
32 impl<W: Write> BrotliEncoder<W> {
33     /// Create a new compression stream which will compress at the given level
34     /// to write compress output to the give output stream.
new(obj: W, level: u32) -> BrotliEncoder<W>35     pub fn new(obj: W, level: u32) -> BrotliEncoder<W> {
36         let mut data = Compress::new();
37         data.set_params(CompressParams::new().quality(level));
38         BrotliEncoder {
39             data: data,
40             obj: Some(obj),
41             buf: Vec::with_capacity(BUF_SIZE),
42             cur: 0,
43             err: None,
44         }
45     }
46 
47     /// Creates a new encoder with a custom `CompressParams`.
from_params(obj: W, params: &CompressParams) -> BrotliEncoder<W>48     pub fn from_params(obj: W, params: &CompressParams) -> BrotliEncoder<W> {
49         let mut data = Compress::new();
50         data.set_params(params);
51         BrotliEncoder {
52             data: data,
53             obj: Some(obj),
54             buf: Vec::with_capacity(BUF_SIZE),
55             cur: 0,
56             err: None,
57         }
58     }
59 
60     /// Acquires a reference to the underlying writer.
get_ref(&self) -> &W61     pub fn get_ref(&self) -> &W {
62         self.obj.as_ref().unwrap()
63     }
64 
65     /// Acquires a mutable reference to the underlying writer.
66     ///
67     /// Note that mutating the output/input state of the stream may corrupt this
68     /// object, so care must be taken when using this method.
get_mut(&mut self) -> &mut W69     pub fn get_mut(&mut self) -> &mut W {
70         self.obj.as_mut().unwrap()
71     }
72 
dump(&mut self) -> io::Result<()>73     fn dump(&mut self) -> io::Result<()> {
74         loop {
75             while !self.buf.is_empty() {
76                 let amt = try!(self.obj.as_mut().unwrap().write(&self.buf[self.cur..]));
77                 self.cur += amt;
78                 if self.cur == self.buf.len() {
79                     self.buf.clear();
80                     self.cur = 0
81                 }
82             }
83             // TODO: if we could peek, the buffer wouldn't be necessary
84             if let Some(data) = self.data.take_output(Some(BUF_SIZE)) {
85                 match self.obj.as_mut().unwrap().write(data) {
86                     Ok(n) => self.buf.extend_from_slice(&data[n..]),
87                     Err(e) => {
88                         self.buf.extend_from_slice(data);
89                         return Err(e)
90                     }
91                 }
92             } else {
93                 break
94             }
95         }
96         Ok(())
97     }
98 
99     // Flush or finish stream, also flushing underlying stream
do_flush_or_finish(&mut self, finish: bool) -> io::Result<()>100     fn do_flush_or_finish(&mut self, finish: bool) -> io::Result<()> {
101         try!(self.dump());
102         let op = if finish { CompressOp::Finish } else { CompressOp::Flush };
103         loop {
104             let status = match self.data.compress(op, &mut &[][..], &mut &mut [][..]) {
105                 Ok(s) => s,
106                 Err(err) => {
107                     self.err = Some(err.clone());
108                     return Err(err.into())
109                 },
110             };
111             let obj = self.obj.as_mut().unwrap();
112             while let Some(data) = self.data.take_output(None) {
113                 try!(obj.write_all(data))
114             }
115             match status {
116                 CoStatus::Finished => {
117                     try!(obj.flush());
118                     return Ok(())
119                 },
120                 CoStatus::Unfinished => (),
121             }
122         }
123     }
124 
125     /// Consumes this encoder, flushing the output stream.
126     ///
127     /// This will flush the underlying data stream and then return the contained
128     /// writer if the flush succeeded.
finish(mut self) -> io::Result<W>129     pub fn finish(mut self) -> io::Result<W> {
130         try!(self.do_flush_or_finish(true));
131         Ok(self.obj.take().unwrap())
132     }
133 }
134 
135 impl<W: Write> Write for BrotliEncoder<W> {
write(&mut self, mut data: &[u8]) -> io::Result<usize>136     fn write(&mut self, mut data: &[u8]) -> io::Result<usize> {
137         if data.is_empty() { return Ok(0) }
138         // If the decompressor has failed at some point, this is set.
139         // Unfortunately we have no idea what status is in the compressor
140         // was in when it failed so we can't do anything except bail again.
141         if let Some(ref err) = self.err {
142             return Err(err.clone().into())
143         }
144         try!(self.dump());
145         // Zero-length output buf to keep it all inside the compressor buffer
146         let avail_in = data.len();
147         if let Err(err) = self.data.compress(CompressOp::Process, &mut data, &mut &mut [][..]) {
148             self.err = Some(err.clone());
149             return Err(err.into())
150         }
151         assert!(avail_in != data.len());
152         Ok(avail_in - data.len())
153     }
154 
flush(&mut self) -> io::Result<()>155     fn flush(&mut self) -> io::Result<()> {
156         self.do_flush_or_finish(false)
157     }
158 }
159 
160 impl<W: Write> Drop for BrotliEncoder<W> {
drop(&mut self)161     fn drop(&mut self) {
162         if self.obj.is_some() {
163             let _ = self.do_flush_or_finish(true);
164         }
165     }
166 }
167 
168 impl<W: Write> BrotliDecoder<W> {
169     /// Creates a new decoding stream which will decode all input written to it
170     /// into `obj`.
new(obj: W) -> BrotliDecoder<W>171     pub fn new(obj: W) -> BrotliDecoder<W> {
172         BrotliDecoder {
173             data: Decompress::new(),
174             obj: Some(obj),
175             buf: Vec::with_capacity(BUF_SIZE),
176             cur: 0,
177             err: None,
178         }
179     }
180 
181     /// Acquires a reference to the underlying writer.
get_ref(&self) -> &W182     pub fn get_ref(&self) -> &W {
183         self.obj.as_ref().unwrap()
184     }
185 
186     /// Acquires a mutable reference to the underlying writer.
187     ///
188     /// Note that mutating the output/input state of the stream may corrupt this
189     /// object, so care must be taken when using this method.
get_mut(&mut self) -> &mut W190     pub fn get_mut(&mut self) -> &mut W {
191         self.obj.as_mut().unwrap()
192     }
193 
dump(&mut self) -> io::Result<()>194     fn dump(&mut self) -> io::Result<()> {
195         loop {
196             while !self.buf.is_empty() {
197                 let amt = try!(self.obj.as_mut().unwrap().write(&self.buf[self.cur..]));
198                 self.cur += amt;
199                 if self.cur == self.buf.len() {
200                     self.buf.clear();
201                     self.cur = 0
202                 }
203             }
204             // TODO: if we could peek, the buffer wouldn't be necessary
205             if let Some(data) = self.data.take_output(Some(BUF_SIZE)) {
206                 self.buf.extend_from_slice(data)
207             } else {
208                 break
209             }
210         }
211         Ok(())
212     }
213 
do_finish(&mut self) -> io::Result<()>214     fn do_finish(&mut self) -> io::Result<()> {
215         try!(self.dump());
216         loop {
217             let status = match self.data.decompress(&mut &[][..], &mut &mut [][..]) {
218                 Ok(s) => s,
219                 Err(err) => {
220                     self.err = Some(err.clone());
221                     return Err(err.into())
222                 },
223             };
224             let obj = self.obj.as_mut().unwrap();
225             while let Some(data) = self.data.take_output(None) {
226                 try!(obj.write_all(data))
227             }
228             match status {
229                 DeStatus::Finished => {
230                     try!(obj.flush());
231                     return Ok(())
232                 },
233                 // When decoding a truncated file, brotli returns DeStatus::NeedInput.
234                 // Since we're finishing, we cannot provide more data so this is an
235                 // error.
236                 DeStatus::NeedInput => {
237                     let msg = "brotli compressed stream is truncated or otherwise corrupt";
238                     return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg))
239                 },
240                 DeStatus::NeedOutput => (),
241             }
242         }
243     }
244 
245     /// Unwrap the underlying writer, finishing the compression stream.
finish(&mut self) -> io::Result<W>246     pub fn finish(&mut self) -> io::Result<W> {
247         try!(self.do_finish());
248         Ok(self.obj.take().unwrap())
249     }
250 }
251 
252 impl<W: Write> Write for BrotliDecoder<W> {
write(&mut self, mut data: &[u8]) -> io::Result<usize>253     fn write(&mut self, mut data: &[u8]) -> io::Result<usize> {
254         if data.is_empty() { return Ok(0) }
255         // If the decompressor has failed at some point, this is set.
256         // Unfortunately we have no idea what status is in the compressor
257         // was in when it failed so we can't do anything except bail again.
258         if let Some(ref err) = self.err {
259             return Err(err.clone().into())
260         }
261         try!(self.dump());
262         // Zero-length output buf to keep it all inside the decompressor buffer
263         let avail_in = data.len();
264         let status = match self.data.decompress(&mut data, &mut &mut [][..]) {
265             Ok(s) => s,
266             Err(err) => {
267                 self.err = Some(err.clone());
268                 return Err(err.into())
269             },
270         };
271         assert!(avail_in != data.len() || status == DeStatus::Finished);
272         Ok(avail_in - data.len())
273     }
274 
flush(&mut self) -> io::Result<()>275     fn flush(&mut self) -> io::Result<()> {
276         try!(self.dump());
277         self.obj.as_mut().unwrap().flush()
278     }
279 }
280 
281 impl<W: Write> Drop for BrotliDecoder<W> {
drop(&mut self)282     fn drop(&mut self) {
283         if self.obj.is_some() {
284             let _ = self.do_finish();
285         }
286     }
287 }
288 
289 #[cfg(test)]
290 mod tests {
291     use std::io::prelude::*;
292     use std::iter::repeat;
293     use super::{BrotliEncoder, BrotliDecoder};
294 
295     #[test]
smoke()296     fn smoke() {
297         let d = BrotliDecoder::new(Vec::new());
298         let mut c = BrotliEncoder::new(d, 6);
299         c.write_all(b"12834").unwrap();
300         let s = repeat("12345").take(100000).collect::<String>();
301         c.write_all(s.as_bytes()).unwrap();
302         let data = c.finish().unwrap().finish().unwrap();
303         assert_eq!(&data[0..5], b"12834");
304         assert_eq!(data.len(), 500005);
305         assert!(format!("12834{}", s).as_bytes() == &*data);
306     }
307 
308     #[test]
write_empty()309     fn write_empty() {
310         let d = BrotliDecoder::new(Vec::new());
311         let mut c = BrotliEncoder::new(d, 6);
312         c.write(b"").unwrap();
313         let data = c.finish().unwrap().finish().unwrap();
314         assert_eq!(&data[..], b"");
315     }
316 
317     #[test]
qc()318     fn qc() {
319         ::quickcheck::quickcheck(test as fn(_) -> _);
320 
321         fn test(v: Vec<u8>) -> bool {
322             let w = BrotliDecoder::new(Vec::new());
323             let mut w = BrotliEncoder::new(w, 6);
324             w.write_all(&v).unwrap();
325             v == w.finish().unwrap().finish().unwrap()
326         }
327     }
328 }
329