1 use super::v2_serializer::{V2SerializeError, V2Serializer}; 2 use super::{Serializer, V2_COMPRESSED_COOKIE}; 3 use crate::core::counter::Counter; 4 use crate::Histogram; 5 use byteorder::{BigEndian, WriteBytesExt}; 6 use flate2::write::ZlibEncoder; 7 use flate2::Compression; 8 use std; 9 use std::io::{ErrorKind, Write}; 10 11 /// Errors that occur during serialization. 12 #[derive(Debug, PartialEq, Eq, Clone, Copy)] 13 pub enum V2DeflateSerializeError { 14 /// The underlying serialization failed 15 InternalSerializationError(V2SerializeError), 16 /// An i/o operation failed. 17 IoError(ErrorKind), 18 } 19 20 impl std::convert::From<std::io::Error> for V2DeflateSerializeError { from(e: std::io::Error) -> Self21 fn from(e: std::io::Error) -> Self { 22 V2DeflateSerializeError::IoError(e.kind()) 23 } 24 } 25 26 /// Serializer for the V2 + DEFLATE binary format. 27 /// 28 /// It's called "deflate" to stay consistent with the naming used in the Java implementation, but 29 /// it actually uses zlib's wrapper format around plain DEFLATE. 30 pub struct V2DeflateSerializer { 31 uncompressed_buf: Vec<u8>, 32 compressed_buf: Vec<u8>, 33 v2_serializer: V2Serializer, 34 } 35 36 impl Default for V2DeflateSerializer { default() -> Self37 fn default() -> Self { 38 Self::new() 39 } 40 } 41 42 impl V2DeflateSerializer { 43 /// Create a new serializer. new() -> V2DeflateSerializer44 pub fn new() -> V2DeflateSerializer { 45 V2DeflateSerializer { 46 uncompressed_buf: Vec::new(), 47 compressed_buf: Vec::new(), 48 v2_serializer: V2Serializer::new(), 49 } 50 } 51 } 52 53 impl Serializer for V2DeflateSerializer { 54 type SerializeError = V2DeflateSerializeError; 55 serialize<T: Counter, W: Write>( &mut self, h: &Histogram<T>, writer: &mut W, ) -> Result<usize, V2DeflateSerializeError>56 fn serialize<T: Counter, W: Write>( 57 &mut self, 58 h: &Histogram<T>, 59 writer: &mut W, 60 ) -> Result<usize, V2DeflateSerializeError> { 61 // TODO benchmark serializing in chunks rather than all at once: each uncompressed v2 chunk 62 // could be compressed and written to the compressed buf, possibly using an approach like 63 // that of https://github.com/HdrHistogram/HdrHistogram_rust/issues/32#issuecomment-287583055. 64 // This would reduce the overall buffer size needed for plain v2 serialization, and be 65 // more cache friendly. 66 67 self.uncompressed_buf.clear(); 68 self.compressed_buf.clear(); 69 // TODO serialize directly into uncompressed_buf without the buffering inside v2_serializer 70 let uncompressed_len = self 71 .v2_serializer 72 .serialize(h, &mut self.uncompressed_buf) 73 .map_err(V2DeflateSerializeError::InternalSerializationError)?; 74 75 debug_assert_eq!(self.uncompressed_buf.len(), uncompressed_len); 76 // On randomized test histograms we get about 10% compression, but of course random data 77 // doesn't compress well. Real-world data may compress better, so let's assume a more 78 // optimistic 50% compression as a baseline to reserve. If we're overly optimistic that's 79 // still only one more allocation the first time it's needed. 80 self.compressed_buf.reserve(self.uncompressed_buf.len() / 2); 81 82 self.compressed_buf 83 .write_u32::<BigEndian>(V2_COMPRESSED_COOKIE)?; 84 // placeholder for length 85 self.compressed_buf.write_u32::<BigEndian>(0)?; 86 87 // TODO pluggable compressors? configurable compression levels? 88 // TODO benchmark https://github.com/sile/libflate 89 // TODO if uncompressed_len is near the limit of 16-bit usize, and compression grows the 90 // data instead of shrinking it (which we cannot really predict), writing to compressed_buf 91 // could panic as Vec overflows its internal `usize`. 92 93 { 94 // TODO reuse deflate buf, or switch to lower-level flate2::Compress 95 let mut compressor = ZlibEncoder::new(&mut self.compressed_buf, Compression::default()); 96 compressor.write_all(&self.uncompressed_buf[0..uncompressed_len])?; 97 let _ = compressor.finish()?; 98 } 99 100 // fill in length placeholder. Won't underflow since length is always at least 8, and won't 101 // overflow u32 as the largest array is about 6 million entries, so about 54MiB encoded (if 102 // counter is u64). 103 let total_compressed_len = self.compressed_buf.len(); 104 (&mut self.compressed_buf[4..8]) 105 .write_u32::<BigEndian>((total_compressed_len as u32) - 8)?; 106 107 writer.write_all(&self.compressed_buf)?; 108 109 Ok(total_compressed_len) 110 } 111 } 112