1 use super::v2_serializer::{V2SerializeError, V2Serializer};
2 use super::{Serializer, V2_COMPRESSED_COOKIE};
3 use crate::core::counter::Counter;
4 use crate::Histogram;
5 use byteorder::{BigEndian, WriteBytesExt};
6 use flate2::write::ZlibEncoder;
7 use flate2::Compression;
8 use std;
9 use std::io::{ErrorKind, Write};
10 
11 /// Errors that occur during serialization.
12 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
13 pub enum V2DeflateSerializeError {
14     /// The underlying serialization failed
15     InternalSerializationError(V2SerializeError),
16     /// An i/o operation failed.
17     IoError(ErrorKind),
18 }
19 
20 impl std::convert::From<std::io::Error> for V2DeflateSerializeError {
from(e: std::io::Error) -> Self21     fn from(e: std::io::Error) -> Self {
22         V2DeflateSerializeError::IoError(e.kind())
23     }
24 }
25 
26 /// Serializer for the V2 + DEFLATE binary format.
27 ///
28 /// It's called "deflate" to stay consistent with the naming used in the Java implementation, but
29 /// it actually uses zlib's wrapper format around plain DEFLATE.
30 pub struct V2DeflateSerializer {
31     uncompressed_buf: Vec<u8>,
32     compressed_buf: Vec<u8>,
33     v2_serializer: V2Serializer,
34 }
35 
36 impl Default for V2DeflateSerializer {
default() -> Self37     fn default() -> Self {
38         Self::new()
39     }
40 }
41 
42 impl V2DeflateSerializer {
43     /// Create a new serializer.
new() -> V2DeflateSerializer44     pub fn new() -> V2DeflateSerializer {
45         V2DeflateSerializer {
46             uncompressed_buf: Vec::new(),
47             compressed_buf: Vec::new(),
48             v2_serializer: V2Serializer::new(),
49         }
50     }
51 }
52 
53 impl Serializer for V2DeflateSerializer {
54     type SerializeError = V2DeflateSerializeError;
55 
serialize<T: Counter, W: Write>( &mut self, h: &Histogram<T>, writer: &mut W, ) -> Result<usize, V2DeflateSerializeError>56     fn serialize<T: Counter, W: Write>(
57         &mut self,
58         h: &Histogram<T>,
59         writer: &mut W,
60     ) -> Result<usize, V2DeflateSerializeError> {
61         // TODO benchmark serializing in chunks rather than all at once: each uncompressed v2 chunk
62         // could be compressed and written to the compressed buf, possibly using an approach like
63         // that of https://github.com/HdrHistogram/HdrHistogram_rust/issues/32#issuecomment-287583055.
64         // This would reduce the overall buffer size needed for plain v2 serialization, and be
65         // more cache friendly.
66 
67         self.uncompressed_buf.clear();
68         self.compressed_buf.clear();
69         // TODO serialize directly into uncompressed_buf without the buffering inside v2_serializer
70         let uncompressed_len = self
71             .v2_serializer
72             .serialize(h, &mut self.uncompressed_buf)
73             .map_err(V2DeflateSerializeError::InternalSerializationError)?;
74 
75         debug_assert_eq!(self.uncompressed_buf.len(), uncompressed_len);
76         // On randomized test histograms we get about 10% compression, but of course random data
77         // doesn't compress well. Real-world data may compress better, so let's assume a more
78         // optimistic 50% compression as a baseline to reserve. If we're overly optimistic that's
79         // still only one more allocation the first time it's needed.
80         self.compressed_buf.reserve(self.uncompressed_buf.len() / 2);
81 
82         self.compressed_buf
83             .write_u32::<BigEndian>(V2_COMPRESSED_COOKIE)?;
84         // placeholder for length
85         self.compressed_buf.write_u32::<BigEndian>(0)?;
86 
87         // TODO pluggable compressors? configurable compression levels?
88         // TODO benchmark https://github.com/sile/libflate
89         // TODO if uncompressed_len is near the limit of 16-bit usize, and compression grows the
90         // data instead of shrinking it (which we cannot really predict), writing to compressed_buf
91         // could panic as Vec overflows its internal `usize`.
92 
93         {
94             // TODO reuse deflate buf, or switch to lower-level flate2::Compress
95             let mut compressor = ZlibEncoder::new(&mut self.compressed_buf, Compression::default());
96             compressor.write_all(&self.uncompressed_buf[0..uncompressed_len])?;
97             let _ = compressor.finish()?;
98         }
99 
100         // fill in length placeholder. Won't underflow since length is always at least 8, and won't
101         // overflow u32 as the largest array is about 6 million entries, so about 54MiB encoded (if
102         // counter is u64).
103         let total_compressed_len = self.compressed_buf.len();
104         (&mut self.compressed_buf[4..8])
105             .write_u32::<BigEndian>((total_compressed_len as u32) - 8)?;
106 
107         writer.write_all(&self.compressed_buf)?;
108 
109         Ok(total_compressed_len)
110     }
111 }
112