1 #ifndef SRL_COMPRESS_H_
2 #define SRL_COMPRESS_H_
3 
4 #include "srl_buffer.h"
5 #include "srl_inline.h"
6 #include "srl_protocol.h"
7 #include "srl_buffer_types.h"
8 
9 /* WARNING: This is different from the protocol bit SRL_PROTOCOL_ENCODING_SNAPPY
10  *          and SRL_PROTOCOL_ENCODING_ZLIB in that it's a flag indicating that
11  *          we want to use Snappy or Zlib.
12  *
13  * DO NOT CHANGE THIS WITHOUT REVIEWING THE BITS IN srl_encoder.h and etc.
14  */
15 
16 #define SRL_F_COMPRESS_SNAPPY                   0x00040UL
17 #define SRL_F_COMPRESS_SNAPPY_INCREMENTAL       0x00080UL
18 #define SRL_F_COMPRESS_ZLIB                     0x00100UL
19 #define SRL_F_COMPRESS_ZSTD                     0x40000UL
20 /* WARNING: IF ADDING NEW COMPRESSION MAKE SURE THAT NEW CONSTANT DOES NOT
21  *          COLLIDE WITH CONSTANTS IN srl_encoder.h!
22  */
23 
24 #define SRL_F_COMPRESS_FLAGS_MASK               (SRL_F_COMPRESS_SNAPPY | \
25                                                  SRL_F_COMPRESS_SNAPPY_INCREMENTAL | \
26                                                  SRL_F_COMPRESS_ZLIB | \
27                                                  SRL_F_COMPRESS_ZSTD)
28 
29 #if defined(HAVE_CSNAPPY)
30 #include <csnappy.h>
31 #else
32 #include "snappy/csnappy_compress.c"
33 #endif
34 
35 #if defined(HAVE_MINIZ)
36 #include <miniz.h>
37 #else
38 #include "miniz.h"
39 #endif
40 
41 #if defined(HAVE_ZSTD)
42 #include <zstd.h>
43 #else
44 #include "zstd/zstd.h"
45 #endif
46 
47 /* Update a varint anywhere in the output stream with defined start and end
48  * positions. This can produce non-canonical varints and is useful for filling
49  * pre-allocated varints. */
50 SRL_STATIC_INLINE void
srl_update_varint_from_to(pTHX_ unsigned char * varint_start,unsigned char * varint_end,UV number)51 srl_update_varint_from_to(pTHX_ unsigned char *varint_start, unsigned char *varint_end, UV number)
52 {
53     while (number >= 0x80) {                      /* while we are larger than 7 bits long */
54         *varint_start++ = (number & 0x7f) | 0x80; /* write out the least significant 7 bits, set the high bit */
55         number = number >> 7;                     /* shift off the 7 least significant bits */
56     }
57 
58     /* if it is the same size we can use a canonical varint */
59     if ( varint_start == varint_end ) {
60         *varint_start = number;                   /* encode the last 7 bits without the high bit being set */
61     } else {
62         /* if not we produce a non-canonical varint, basically we stuff
63          * 0 bits (via 0x80) into the "tail" of the varint, until we can
64          * stick in a null to terminate the sequence. This means that the
65          * varint is effectively "self-padding", and we only need special
66          * logic in the encoder - a decoder will happily process a non-canonical
67          * varint with no problem */
68         *varint_start++ = (number & 0x7f) | 0x80;
69         while ( varint_start < varint_end )
70             *varint_start++ = 0x80;
71         *varint_start= 0;
72     }
73 }
74 
75 /* Lazy working buffer alloc */
76 SRL_STATIC_INLINE void
srl_init_snappy_workmem(pTHX_ void ** workmem)77 srl_init_snappy_workmem(pTHX_ void **workmem)
78 {
79     /* Lazy working buffer alloc */
80     if (expect_false(*workmem == NULL)) {
81         /* Cleaned up automatically by the cleanup handler */
82         Newx(*workmem, CSNAPPY_WORKMEM_BYTES, char);
83         if (*workmem == NULL)
84             croak("Out of memory!");
85     }
86 }
87 
88 /* Destroy working buffer */
89 SRL_STATIC_INLINE void
srl_destroy_snappy_workmem(pTHX_ void * workmem)90 srl_destroy_snappy_workmem(pTHX_ void *workmem)
91 {
92     Safefree(workmem);
93 }
94 
95 SRL_STATIC_INLINE U8
srl_get_compression_header_flag(const U32 compress_flags)96 srl_get_compression_header_flag(const U32 compress_flags)
97 {
98     if (compress_flags & SRL_F_COMPRESS_SNAPPY) {
99         return SRL_PROTOCOL_ENCODING_SNAPPY;
100     } else if (compress_flags & SRL_F_COMPRESS_SNAPPY_INCREMENTAL) {
101         return SRL_PROTOCOL_ENCODING_SNAPPY_INCREMENTAL;
102     } else if (compress_flags & SRL_F_COMPRESS_ZLIB) {
103         return SRL_PROTOCOL_ENCODING_ZLIB;
104     } else if (compress_flags & SRL_F_COMPRESS_ZSTD) {
105         return SRL_PROTOCOL_ENCODING_ZSTD;
106     } else {
107         return SRL_PROTOCOL_ENCODING_RAW;
108     }
109 }
110 
111 /* Sets the compression header flag */
112 SRL_STATIC_INLINE void
srl_set_compression_header_flag(srl_buffer_t * buf,const U32 compress_flags)113 srl_set_compression_header_flag(srl_buffer_t *buf, const U32 compress_flags)
114 {
115     /* sizeof(const char *) includes a count of \0 */
116     srl_buffer_char *flags_and_version_byte = buf->start + sizeof(SRL_MAGIC_STRING) - 1;
117     *flags_and_version_byte |= srl_get_compression_header_flag(compress_flags);
118 }
119 
120 /* Resets the compression header flag to OFF.
121  * Obviously requires that a Sereal header was already written to the
122  * encoder's output buffer. */
123 SRL_STATIC_INLINE void
srl_reset_compression_header_flag(srl_buffer_t * buf)124 srl_reset_compression_header_flag(srl_buffer_t *buf)
125 {
126     /* sizeof(const char *) includes a count of \0 */
127     srl_buffer_char *flags_and_version_byte = buf->start + sizeof(SRL_MAGIC_STRING) - 1;
128 
129     /* disable snappy flag in header */
130     *flags_and_version_byte = SRL_PROTOCOL_ENCODING_RAW |
131                               (*flags_and_version_byte & SRL_PROTOCOL_VERSION_MASK);
132 }
133 
134 /* Compress body with one of available compressors (zlib, snappy).
135  * The function sets/resets compression bits at version byte.
136  * The caller has to adjust buf->body_pos by calling SRL_UPDATE_BODY_POS
137  * right after exiting from srl_compress_body.
138  */
139 
140 SRL_STATIC_INLINE void
srl_compress_body(pTHX_ srl_buffer_t * buf,STRLEN sereal_header_length,const U32 compress_flags,const int compress_level,void ** workmem)141 srl_compress_body(pTHX_ srl_buffer_t *buf, STRLEN sereal_header_length,
142                   const U32 compress_flags, const int compress_level, void **workmem)
143 {
144     const int is_traditional_snappy = compress_flags & SRL_F_COMPRESS_SNAPPY;
145     const int is_incremental_snappy = compress_flags & SRL_F_COMPRESS_SNAPPY_INCREMENTAL;
146     const int is_zstd = compress_flags & SRL_F_COMPRESS_ZSTD;
147     const int is_zlib = !is_traditional_snappy && !is_incremental_snappy && !is_zstd;
148 
149     size_t uncompressed_body_length = BUF_POS_OFS(buf) - sereal_header_length;
150     size_t compressed_body_length;
151     srl_buffer_char *varint_start = NULL;
152     srl_buffer_char *varint_end = NULL;
153     srl_buffer_t old_buf;
154 
155     DEBUG_ASSERT_BUF_SANE(buf);
156 
157     /* Get estimated compressed payload length */
158     if (is_incremental_snappy) {
159         compressed_body_length = (size_t) csnappy_max_compressed_length(uncompressed_body_length);
160         compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed compressed packet length as varint */
161     } else if (is_traditional_snappy) {
162         compressed_body_length = (size_t) csnappy_max_compressed_length(uncompressed_body_length);
163     } else if (is_zstd) {
164         compressed_body_length = ZSTD_compressBound(uncompressed_body_length);
165         compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed compressed packet length as varint */
166     } else {
167         compressed_body_length = (size_t) mz_compressBound(uncompressed_body_length);
168         compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed uncommpressed packet length as varint */
169         compressed_body_length += SRL_MAX_VARINT_LENGTH; /* will have to embed compressed packet length as varint */
170     }
171 
172     /* Back up old buffer and allocate new one with correct size */
173     srl_buf_copy_buffer(aTHX_ buf, &old_buf);
174     srl_buf_init_buffer(aTHX_ buf, sereal_header_length + compressed_body_length + 1);
175 
176     /* Copy Sereal header */
177     Copy(old_buf.start, buf->pos, sereal_header_length, char);
178     buf->pos += sereal_header_length;
179 
180     /* Embed uncompressed packet length if Zlib */
181     if (is_zlib) srl_buf_cat_varint_nocheck(aTHX_ buf, 0, uncompressed_body_length);
182 
183     /* Embed compressed packet length if incr. Snappy, Zlib or Zstd*/
184     if (is_incremental_snappy || is_zlib || is_zstd) {
185         varint_start = buf->pos;
186         srl_buf_cat_varint_nocheck(aTHX_ buf, 0, compressed_body_length);
187         varint_end = buf->pos - 1;
188     }
189 
190     if (is_incremental_snappy || is_traditional_snappy) {
191         uint32_t len = (uint32_t) compressed_body_length;
192         srl_init_snappy_workmem(aTHX_ workmem);
193 
194         csnappy_compress((char*) (old_buf.start + sereal_header_length), (uint32_t) uncompressed_body_length,
195                          (char*) buf->pos, &len, *workmem, CSNAPPY_WORKMEM_BYTES_POWER_OF_TWO);
196 
197         compressed_body_length = (size_t) len;
198     } else if (is_zstd) {
199         size_t code = ZSTD_compress((void*) buf->pos, compressed_body_length,
200                                     (void*) (old_buf.start + sereal_header_length), uncompressed_body_length,
201                                     compress_level);
202 
203         assert(ZSTD_isError(code) == 0);
204         compressed_body_length = code;
205     } else if (is_zlib) {
206         mz_ulong dl = (mz_ulong) compressed_body_length;
207         int status = mz_compress2(
208             buf->pos,
209             &dl,
210             old_buf.start + sereal_header_length,
211             (mz_ulong) uncompressed_body_length,
212             compress_level
213         );
214 
215         (void)status;
216         assert(status == Z_OK);
217         compressed_body_length = (size_t) dl;
218     }
219 
220     assert(compressed_body_length != 0);
221 
222     /* If compression didn't help, swap back to old, uncompressed buffer */
223     if (compressed_body_length >= uncompressed_body_length) {
224         /* swap in old, uncompressed buffer */
225         srl_buf_swap_buffer(aTHX_ buf, &old_buf);
226         /* disable compression flag */
227         srl_reset_compression_header_flag(buf);
228     } else { /* go ahead with Snappy and do final fixups */
229         /* overwrite the max size varint with the real size of the compressed data */
230         if (varint_start)
231             srl_update_varint_from_to(aTHX_ varint_start, varint_end, compressed_body_length);
232 
233         buf->pos += compressed_body_length;
234 
235         /* enable compression flag */
236         srl_set_compression_header_flag(buf, compress_flags);
237     }
238 
239     srl_buf_free_buffer(aTHX_ &old_buf);
240     DEBUG_ASSERT_BUF_SANE(buf);
241 }
242 
243 #endif
244