1 //! I/O streams for wrapping `BufRead` types as encoders/decoders
2 
3 use std::io::prelude::*;
4 use std::io;
5 
6 use super::CompressParams;
7 use raw::{self, Decompress, DeStatus, Compress, CompressOp, CoStatus};
8 
9 #[derive(Clone, Copy, Eq, PartialEq)]
10 enum DoneStatus {
11     Processing,
12     Finishing,
13     Done,
14 }
15 
16 /// A brotli encoder, or compressor.
17 ///
18 /// This structure implements a `BufRead` interface and will read uncompressed
19 /// data from an underlying stream and emit a stream of compressed data.
20 pub struct BrotliEncoder<R: BufRead> {
21     obj: R,
22     data: Compress,
23     done: DoneStatus,
24     err: Option<raw::Error>,
25 }
26 
27 /// A brotli decoder, or decompressor.
28 ///
29 /// This structure implements a `BufRead` interface and takes a stream of
30 /// compressed data as input, providing the decompressed data when read from.
31 pub struct BrotliDecoder<R: BufRead> {
32     obj: R,
33     data: Decompress,
34     err: Option<raw::Error>,
35 }
36 
37 impl<R: BufRead> BrotliEncoder<R> {
38     /// Creates a new encoder which will read uncompressed data from the given
39     /// stream and emit the compressed stream.
40     ///
41     /// The `level` argument here is typically 0-11.
new(r: R, level: u32) -> BrotliEncoder<R>42     pub fn new(r: R, level: u32) -> BrotliEncoder<R> {
43         let mut data = Compress::new();
44         data.set_params(CompressParams::new().quality(level));
45         BrotliEncoder {
46             obj: r,
47             data: data,
48             done: DoneStatus::Processing,
49             err: None,
50         }
51     }
52 
53     /// Creates a new encoder with a custom `CompressParams`.
from_params(r: R, params: &CompressParams) -> BrotliEncoder<R>54     pub fn from_params(r: R, params: &CompressParams) -> BrotliEncoder<R> {
55         let mut data = Compress::new();
56         data.set_params(params);
57         BrotliEncoder {
58             obj: r,
59             data: data,
60             done: DoneStatus::Processing,
61             err: None,
62         }
63     }
64 
65     /// Acquires a reference to the underlying stream
get_ref(&self) -> &R66     pub fn get_ref(&self) -> &R {
67         &self.obj
68     }
69 
70     /// Acquires a mutable reference to the underlying stream
71     ///
72     /// Note that mutation of the stream may result in surprising results if
73     /// this encoder is continued to be used.
get_mut(&mut self) -> &mut R74     pub fn get_mut(&mut self) -> &mut R {
75         &mut self.obj
76     }
77 
78     /// Consumes this encoder, returning the underlying reader.
into_inner(self) -> R79     pub fn into_inner(self) -> R {
80         self.obj
81     }
82 }
83 
84 impl<R: BufRead> Read for BrotliEncoder<R> {
read(&mut self, mut buf: &mut [u8]) -> io::Result<usize>85     fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
86         if buf.is_empty() { return Ok(0) }
87         // If the compressor has failed at some point, this is set.
88         // Unfortunately we have no idea what status is in the compressor
89         // was in when it failed so we can't do anything except bail again.
90         if let Some(ref err) = self.err {
91             return Err(err.clone().into())
92         }
93 
94         if let Some(data) = self.data.take_output(Some(buf.len())) {
95             buf[..data.len()].copy_from_slice(data);
96             return Ok(data.len())
97         }
98 
99         match self.done {
100             DoneStatus::Done => return Ok(0),
101             DoneStatus::Finishing => return tryfinish(self, buf),
102             DoneStatus::Processing => (),
103         }
104 
105         loop {
106             let amt_in;
107             let amt_out;
108             {
109                 let input = &mut try!(self.obj.fill_buf());
110                 let avail_in = input.len();
111                 if avail_in == 0 {
112                     break
113                 }
114                 let output = &mut buf;
115                 let avail_out = output.len();
116                 if let Err(err) = self.data.compress(CompressOp::Process, input, output) {
117                     self.err = Some(err.clone().into());
118                     return Err(err.into())
119                 }
120                 amt_in = avail_in - input.len();
121                 amt_out = avail_out - output.len();
122             }
123             self.obj.consume(amt_in);
124 
125             if amt_out == 0 {
126                 assert!(amt_in != 0);
127                 continue
128             }
129             return Ok(amt_out)
130         }
131         self.done = DoneStatus::Finishing;
132         return tryfinish(self, buf);
133 
134         fn tryfinish<R: BufRead>(enc: &mut BrotliEncoder<R>, mut buf: &mut [u8])
135                 -> io::Result<usize> {
136             let output = &mut buf;
137             let avail_out = output.len();
138             let iscomplete = match enc.data.compress(CompressOp::Finish, &mut &[][..], output) {
139                 Ok(c) => c,
140                 Err(err) => {
141                     enc.err = err.clone().into();
142                     return Err(err.into())
143                 },
144             };
145             let written = avail_out - output.len();
146             assert!(written != 0 || iscomplete == CoStatus::Finished);
147             if iscomplete == CoStatus::Finished {
148                 enc.done = DoneStatus::Done
149             }
150             Ok(written)
151         }
152     }
153 }
154 
155 impl<R: BufRead> BrotliDecoder<R> {
156     /// Creates a new decoder which will decompress data read from the given
157     /// stream.
new(r: R) -> BrotliDecoder<R>158     pub fn new(r: R) -> BrotliDecoder<R> {
159         BrotliDecoder {
160             data: Decompress::new(),
161             obj: r,
162             err: None,
163         }
164     }
165 
166     /// Acquires a reference to the underlying stream
get_ref(&self) -> &R167     pub fn get_ref(&self) -> &R {
168         &self.obj
169     }
170 
171     /// Acquires a mutable reference to the underlying stream
172     ///
173     /// Note that mutation of the stream may result in surprising results if
174     /// this encoder is continued to be used.
get_mut(&mut self) -> &mut R175     pub fn get_mut(&mut self) -> &mut R {
176         &mut self.obj
177     }
178 
179     /// Consumes this decoder, returning the underlying reader.
into_inner(self) -> R180     pub fn into_inner(self) -> R {
181         self.obj
182     }
183 }
184 
185 impl<R: BufRead> Read for BrotliDecoder<R> {
read(&mut self, mut buf: &mut [u8]) -> io::Result<usize>186     fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
187         if buf.is_empty() { return Ok(0) }
188         // If the decompressor has failed at some point, this is set.
189         // Unfortunately we have no idea what status is in the compressor
190         // was in when it failed so we can't do anything except bail again.
191         if let Some(ref err) = self.err {
192             return Err(err.clone().into())
193         }
194 
195         loop {
196             let amt_in;
197             let amt_out;
198             let status;
199             {
200                 let mut input = try!(self.obj.fill_buf());
201                 let avail_in = input.len();
202                 let avail_out = buf.len();
203                 status = match self.data.decompress(&mut input, &mut buf) {
204                     Ok(s) => s,
205                     Err(err) => {
206                         self.err = Some(err.clone().into());
207                         return Err(err.into())
208                     },
209                 };
210                 amt_in = avail_in - input.len();
211                 amt_out = avail_out - buf.len()
212             }
213             self.obj.consume(amt_in);
214 
215             if amt_in == 0 && status == DeStatus::NeedInput {
216                 return Err(io::Error::new(io::ErrorKind::Other,
217                                           "corrupted brotli stream"))
218             }
219             if amt_out == 0 && status != DeStatus::Finished {
220                 assert!(amt_in != 0);
221                 continue
222             }
223 
224             return Ok(amt_out)
225         }
226     }
227 }
228 
229 
230