1 //! I/O streams for wrapping `BufRead` types as encoders/decoders 2 3 use std::io::prelude::*; 4 use std::io; 5 6 use super::CompressParams; 7 use raw::{self, Decompress, DeStatus, Compress, CompressOp, CoStatus}; 8 9 #[derive(Clone, Copy, Eq, PartialEq)] 10 enum DoneStatus { 11 Processing, 12 Finishing, 13 Done, 14 } 15 16 /// A brotli encoder, or compressor. 17 /// 18 /// This structure implements a `BufRead` interface and will read uncompressed 19 /// data from an underlying stream and emit a stream of compressed data. 20 pub struct BrotliEncoder<R: BufRead> { 21 obj: R, 22 data: Compress, 23 done: DoneStatus, 24 err: Option<raw::Error>, 25 } 26 27 /// A brotli decoder, or decompressor. 28 /// 29 /// This structure implements a `BufRead` interface and takes a stream of 30 /// compressed data as input, providing the decompressed data when read from. 31 pub struct BrotliDecoder<R: BufRead> { 32 obj: R, 33 data: Decompress, 34 err: Option<raw::Error>, 35 } 36 37 impl<R: BufRead> BrotliEncoder<R> { 38 /// Creates a new encoder which will read uncompressed data from the given 39 /// stream and emit the compressed stream. 40 /// 41 /// The `level` argument here is typically 0-11. new(r: R, level: u32) -> BrotliEncoder<R>42 pub fn new(r: R, level: u32) -> BrotliEncoder<R> { 43 let mut data = Compress::new(); 44 data.set_params(CompressParams::new().quality(level)); 45 BrotliEncoder { 46 obj: r, 47 data: data, 48 done: DoneStatus::Processing, 49 err: None, 50 } 51 } 52 53 /// Creates a new encoder with a custom `CompressParams`. from_params(r: R, params: &CompressParams) -> BrotliEncoder<R>54 pub fn from_params(r: R, params: &CompressParams) -> BrotliEncoder<R> { 55 let mut data = Compress::new(); 56 data.set_params(params); 57 BrotliEncoder { 58 obj: r, 59 data: data, 60 done: DoneStatus::Processing, 61 err: None, 62 } 63 } 64 65 /// Acquires a reference to the underlying stream get_ref(&self) -> &R66 pub fn get_ref(&self) -> &R { 67 &self.obj 68 } 69 70 /// Acquires a mutable reference to the underlying stream 71 /// 72 /// Note that mutation of the stream may result in surprising results if 73 /// this encoder is continued to be used. get_mut(&mut self) -> &mut R74 pub fn get_mut(&mut self) -> &mut R { 75 &mut self.obj 76 } 77 78 /// Consumes this encoder, returning the underlying reader. into_inner(self) -> R79 pub fn into_inner(self) -> R { 80 self.obj 81 } 82 } 83 84 impl<R: BufRead> Read for BrotliEncoder<R> { read(&mut self, mut buf: &mut [u8]) -> io::Result<usize>85 fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> { 86 if buf.is_empty() { return Ok(0) } 87 // If the compressor has failed at some point, this is set. 88 // Unfortunately we have no idea what status is in the compressor 89 // was in when it failed so we can't do anything except bail again. 90 if let Some(ref err) = self.err { 91 return Err(err.clone().into()) 92 } 93 94 if let Some(data) = self.data.take_output(Some(buf.len())) { 95 buf[..data.len()].copy_from_slice(data); 96 return Ok(data.len()) 97 } 98 99 match self.done { 100 DoneStatus::Done => return Ok(0), 101 DoneStatus::Finishing => return tryfinish(self, buf), 102 DoneStatus::Processing => (), 103 } 104 105 loop { 106 let amt_in; 107 let amt_out; 108 { 109 let input = &mut try!(self.obj.fill_buf()); 110 let avail_in = input.len(); 111 if avail_in == 0 { 112 break 113 } 114 let output = &mut buf; 115 let avail_out = output.len(); 116 if let Err(err) = self.data.compress(CompressOp::Process, input, output) { 117 self.err = Some(err.clone().into()); 118 return Err(err.into()) 119 } 120 amt_in = avail_in - input.len(); 121 amt_out = avail_out - output.len(); 122 } 123 self.obj.consume(amt_in); 124 125 if amt_out == 0 { 126 assert!(amt_in != 0); 127 continue 128 } 129 return Ok(amt_out) 130 } 131 self.done = DoneStatus::Finishing; 132 return tryfinish(self, buf); 133 134 fn tryfinish<R: BufRead>(enc: &mut BrotliEncoder<R>, mut buf: &mut [u8]) 135 -> io::Result<usize> { 136 let output = &mut buf; 137 let avail_out = output.len(); 138 let iscomplete = match enc.data.compress(CompressOp::Finish, &mut &[][..], output) { 139 Ok(c) => c, 140 Err(err) => { 141 enc.err = err.clone().into(); 142 return Err(err.into()) 143 }, 144 }; 145 let written = avail_out - output.len(); 146 assert!(written != 0 || iscomplete == CoStatus::Finished); 147 if iscomplete == CoStatus::Finished { 148 enc.done = DoneStatus::Done 149 } 150 Ok(written) 151 } 152 } 153 } 154 155 impl<R: BufRead> BrotliDecoder<R> { 156 /// Creates a new decoder which will decompress data read from the given 157 /// stream. new(r: R) -> BrotliDecoder<R>158 pub fn new(r: R) -> BrotliDecoder<R> { 159 BrotliDecoder { 160 data: Decompress::new(), 161 obj: r, 162 err: None, 163 } 164 } 165 166 /// Acquires a reference to the underlying stream get_ref(&self) -> &R167 pub fn get_ref(&self) -> &R { 168 &self.obj 169 } 170 171 /// Acquires a mutable reference to the underlying stream 172 /// 173 /// Note that mutation of the stream may result in surprising results if 174 /// this encoder is continued to be used. get_mut(&mut self) -> &mut R175 pub fn get_mut(&mut self) -> &mut R { 176 &mut self.obj 177 } 178 179 /// Consumes this decoder, returning the underlying reader. into_inner(self) -> R180 pub fn into_inner(self) -> R { 181 self.obj 182 } 183 } 184 185 impl<R: BufRead> Read for BrotliDecoder<R> { read(&mut self, mut buf: &mut [u8]) -> io::Result<usize>186 fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> { 187 if buf.is_empty() { return Ok(0) } 188 // If the decompressor has failed at some point, this is set. 189 // Unfortunately we have no idea what status is in the compressor 190 // was in when it failed so we can't do anything except bail again. 191 if let Some(ref err) = self.err { 192 return Err(err.clone().into()) 193 } 194 195 loop { 196 let amt_in; 197 let amt_out; 198 let status; 199 { 200 let mut input = try!(self.obj.fill_buf()); 201 let avail_in = input.len(); 202 let avail_out = buf.len(); 203 status = match self.data.decompress(&mut input, &mut buf) { 204 Ok(s) => s, 205 Err(err) => { 206 self.err = Some(err.clone().into()); 207 return Err(err.into()) 208 }, 209 }; 210 amt_in = avail_in - input.len(); 211 amt_out = avail_out - buf.len() 212 } 213 self.obj.consume(amt_in); 214 215 if amt_in == 0 && status == DeStatus::NeedInput { 216 return Err(io::Error::new(io::ErrorKind::Other, 217 "corrupted brotli stream")) 218 } 219 if amt_out == 0 && status != DeStatus::Finished { 220 assert!(amt_in != 0); 221 continue 222 } 223 224 return Ok(amt_out) 225 } 226 } 227 } 228 229 230