1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2017 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
19  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25  * POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include "rdkafka_int.h"
29 #include "rdkafka_lz4.h"
30 
31 #if WITH_LZ4_EXT
32 #include <lz4frame.h>
33 #else
34 #include "lz4frame.h"
35 #endif
36 #include "rdxxhash.h"
37 
38 #include "rdbuf.h"
39 
40 /**
41  * Fix-up bad LZ4 framing caused by buggy Kafka client / broker.
42  * The LZ4F framing format is described in detail here:
43  * https://github.com/lz4/lz4/blob/master/doc/lz4_Frame_format.md
44  *
45  * NOTE: This modifies 'inbuf'.
46  *
47  * Returns an error on failure to fix (nothing modified), else NO_ERROR.
48  */
49 static rd_kafka_resp_err_t
50 rd_kafka_lz4_decompress_fixup_bad_framing (rd_kafka_broker_t *rkb,
51                                            char *inbuf, size_t inlen) {
52         static const char magic[4] = { 0x04, 0x22, 0x4d, 0x18 };
53         uint8_t FLG, HC, correct_HC;
54         size_t of = 4;
55 
56         /* Format is:
57          *    int32_t magic;
58          *    int8_t_ FLG;
59          *    int8_t  BD;
60          *  [ int64_t contentSize; ]
61          *    int8_t  HC;
62          */
63         if (inlen < 4+3 || memcmp(inbuf, magic, 4)) {
64                 rd_rkb_dbg(rkb, BROKER,  "LZ4FIXUP",
65                            "Unable to fix-up legacy LZ4 framing "
66                            "(%"PRIusz" bytes): invalid length or magic value",
67                            inlen);
68                 return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
69         }
70 
71         of = 4; /* past magic */
72         FLG = inbuf[of++];
73         of++; /* BD */
74 
75         if ((FLG >> 3) & 1) /* contentSize */
76                 of += 8;
77 
78         if (of >= inlen) {
79                 rd_rkb_dbg(rkb, BROKER,  "LZ4FIXUP",
80                            "Unable to fix-up legacy LZ4 framing "
81                            "(%"PRIusz" bytes): requires %"PRIusz" bytes",
82                            inlen, of);
83                 return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
84         }
85 
86         /* Header hash code */
87         HC = inbuf[of];
88 
89         /* Calculate correct header hash code */
90         correct_HC = (XXH32(inbuf+4, of-4, 0) >> 8) & 0xff;
91 
92         if (HC != correct_HC)
93                 inbuf[of] = correct_HC;
94 
95         return RD_KAFKA_RESP_ERR_NO_ERROR;
96 }
97 
98 
99 /**
100  * Reverse of fix-up: break LZ4 framing caused to be compatbile with with
101  * buggy Kafka client / broker.
102  *
103  * NOTE: This modifies 'outbuf'.
104  *
105  * Returns an error on failure to recognize format (nothing modified),
106  * else NO_ERROR.
107  */
108 static rd_kafka_resp_err_t
109 rd_kafka_lz4_compress_break_framing (rd_kafka_broker_t *rkb,
110                                      char *outbuf, size_t outlen) {
111         static const char magic[4] = { 0x04, 0x22, 0x4d, 0x18 };
pg_atomic_test_set_flag_impl(volatile pg_atomic_flag * ptr)112         uint8_t FLG, HC, bad_HC;
113         size_t of = 4;
114 
115         /* Format is:
116          *    int32_t magic;
117          *    int8_t_ FLG;
118          *    int8_t  BD;
119          *  [ int64_t contentSize; ]
120          *    int8_t  HC;
121          */
122         if (outlen < 4+3 || memcmp(outbuf, magic, 4)) {
123                 rd_rkb_dbg(rkb, BROKER,  "LZ4FIXDOWN",
124                            "Unable to break legacy LZ4 framing "
pg_atomic_unlocked_test_flag_impl(volatile pg_atomic_flag * ptr)125                            "(%"PRIusz" bytes): invalid length or magic value",
126                            outlen);
127                 return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
128         }
129 
130         of = 4; /* past magic */
131         FLG = outbuf[of++];
132         of++; /* BD */
133 
pg_atomic_clear_flag_impl(volatile pg_atomic_flag * ptr)134         if ((FLG >> 3) & 1) /* contentSize */
135                 of += 8;
136 
137         if (of >= outlen) {
138                 rd_rkb_dbg(rkb, BROKER,  "LZ4FIXUP",
139                            "Unable to break legacy LZ4 framing "
140                            "(%"PRIusz" bytes): requires %"PRIusz" bytes",
141                            outlen, of);
142                 return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
pg_atomic_init_flag_impl(volatile pg_atomic_flag * ptr)143         }
144 
145         /* Header hash code */
146         HC = outbuf[of];
147 
148         /* Calculate bad header hash code (include magic) */
149         bad_HC = (XXH32(outbuf, of, 0) >> 8) & 0xff;
150 
151         if (HC != bad_HC)
152                 outbuf[of] = bad_HC;
153 
154         return RD_KAFKA_RESP_ERR_NO_ERROR;
pg_atomic_compare_exchange_u32_impl(volatile pg_atomic_uint32 * ptr,uint32 * expected,uint32 newval)155 }
156 
157 
158 
159 /**
160  * @brief Decompress LZ4F (framed) data.
161  *        Kafka broker versions <0.10.0.0 (MsgVersion 0) breaks LZ4 framing
162  *        checksum, if \p proper_hc we assume the checksum is okay
163  *        (broker version >=0.10.0, MsgVersion >= 1) else we fix it up.
164  *
165  * @remark May modify \p inbuf (if not \p proper_hc)
166  */
167 rd_kafka_resp_err_t
168 rd_kafka_lz4_decompress (rd_kafka_broker_t *rkb, int proper_hc, int64_t Offset,
169                          char *inbuf, size_t inlen,
170                          void **outbuf, size_t *outlenp) {
171         LZ4F_errorCode_t code;
172         LZ4F_decompressionContext_t dctx;
173         LZ4F_frameInfo_t fi;
174         size_t in_sz, out_sz;
175         size_t in_of, out_of;
176         size_t r;
177         size_t estimated_uncompressed_size;
178         size_t outlen;
179         rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
180         char *out = NULL;
181 
182         *outbuf = NULL;
183 
184         code = LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION);
185         if (LZ4F_isError(code)) {
186                 rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR",
187                            "Unable to create LZ4 decompression context: %s",
188                            LZ4F_getErrorName(code));
189                 return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
190         }
191 
192         if (!proper_hc) {
193                 /* The original/legacy LZ4 framing in Kafka was buggy and
194                  * calculated the LZ4 framing header hash code (HC) incorrectly.
195                  * We do a fix-up of it here. */
196                 if ((err = rd_kafka_lz4_decompress_fixup_bad_framing(rkb,
197                                                                      inbuf,
198                                                                      inlen)))
199                         goto done;
200         }
201 
202         in_sz = inlen;
203         r = LZ4F_getFrameInfo(dctx, &fi, (const void *)inbuf, &in_sz);
204         if (LZ4F_isError(r)) {
205                 rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR",
206                            "Failed to gather LZ4 frame info: %s",
207                            LZ4F_getErrorName(r));
208                 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
209                 goto done;
210         }
211 
212         /* If uncompressed size is unknown or out of bounds, use a sane
213          * default (4x compression) and reallocate if needed
214          * More info on max size: http://stackoverflow.com/a/25751871/1821055
215          * More info on lz4 compression ratios seen for different data sets:
216          * http://dev.ti.com/tirex/content/simplelink_msp432p4_sdk_1_50_00_12/docs/lz4/users_guide/docguide.llQpgm/benchmarking.html
217          */
218         if (fi.contentSize == 0 || fi.contentSize > inlen * 255) {
219                 estimated_uncompressed_size = RD_MIN(
220                         inlen * 4,
221                         (size_t)(rkb->rkb_rk->rk_conf.max_msg_size));
222         } else {
223                 estimated_uncompressed_size = (size_t)fi.contentSize;
224         }
225 
226         /* Allocate output buffer, we increase this later if needed,
227          * but hopefully not. */
228         out = rd_malloc(estimated_uncompressed_size);
229         if (!out) {
230                 rd_rkb_log(rkb, LOG_WARNING, "LZ4DEC",
231                            "Unable to allocate decompression "
232                            "buffer of %"PRIusz" bytes: %s",
233                            estimated_uncompressed_size, rd_strerror(errno));
234                 err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
235                 goto done;
236         }
237 
238 
239         /* Decompress input buffer to output buffer until input is exhausted. */
240         outlen = estimated_uncompressed_size;
241         in_of = in_sz;
242         out_of = 0;
243         while (in_of < inlen) {
244                 out_sz = outlen - out_of;
245                 in_sz = inlen - in_of;
246                 r = LZ4F_decompress(dctx, out+out_of, &out_sz,
247                                     inbuf+in_of, &in_sz, NULL);
248                 if (unlikely(LZ4F_isError(r))) {
249                         rd_rkb_dbg(rkb, MSG, "LZ4DEC",
250                                    "Failed to LZ4 (%s HC) decompress message "
251                                    "(offset %"PRId64") at "
252                                    "payload offset %"PRIusz"/%"PRIusz": %s",
253                                    proper_hc ? "proper":"legacy",
254                                    Offset, in_of, inlen,  LZ4F_getErrorName(r));
255                         err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
256                         goto done;
257                 }
258 
259                 rd_kafka_assert(NULL, out_of + out_sz <= outlen &&
260                                 in_of + in_sz <= inlen);
261                 out_of += out_sz;
262                 in_of += in_sz;
263                 if (r == 0)
264                         break;
265 
266                 /* Need to grow output buffer, this shouldn't happen if
267                  * contentSize was properly set. */
268                 if (unlikely(out_of == outlen)) {
269                         char *tmp;
270                         /* Grow exponentially with some factor > 1 (using 1.75)
271                          * for amortized O(1) copying */
272                         size_t extra = RD_MAX(outlen * 3 / 4, 1024);
273 
274                         rd_atomic64_add(&rkb->rkb_c.zbuf_grow, 1);
275 
276                         if (!(tmp = rd_realloc(out, outlen + extra))) {
277                                 rd_rkb_log(rkb, LOG_WARNING, "LZ4DEC",
278                                            "Unable to grow decompression "
279                                            "buffer to %"PRIusz"+%"PRIusz" bytes: %s",
280                                            outlen, extra,rd_strerror(errno));
281                                 err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
282                                 goto done;
283                         }
284                         out = tmp;
285                         outlen += extra;
286                 }
287         }
288 
289 
290         if (in_of < inlen) {
291                 rd_rkb_dbg(rkb, MSG, "LZ4DEC",
292                            "Failed to LZ4 (%s HC) decompress message "
293                            "(offset %"PRId64"): "
294                            "%"PRIusz" (out of %"PRIusz") bytes remaining",
295                            proper_hc ? "proper":"legacy",
296                            Offset, inlen-in_of, inlen);
297                 err = RD_KAFKA_RESP_ERR__BAD_MSG;
298                 goto done;
299         }
300 
301         *outbuf = out;
302         *outlenp = out_of;
303 
304  done:
305         code = LZ4F_freeDecompressionContext(dctx);
306         if (LZ4F_isError(code)) {
307                 rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR",
308                            "Failed to close LZ4 compression context: %s",
309                            LZ4F_getErrorName(code));
310                 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
311         }
312 
313         if (err && out)
314                 rd_free(out);
315 
316         return err;
317 }
318 
319 
320 /**
321  * Allocate space for \p *outbuf and compress all \p iovlen buffers in \p iov.
322  * @param proper_hc generate a proper HC (checksum) (kafka >=0.10.0.0, MsgVersion >= 1)
323  * @param MessageSetSize indicates (at least) full uncompressed data size,
324  *                       possibly including MessageSet fields that will not
325  *                       be compressed.
326  *
327  * @returns allocated buffer in \p *outbuf, length in \p *outlenp.
328  */
329 rd_kafka_resp_err_t
330 rd_kafka_lz4_compress (rd_kafka_broker_t *rkb, int proper_hc, int comp_level,
331                        rd_slice_t *slice, void **outbuf, size_t *outlenp) {
332         LZ4F_compressionContext_t cctx;
333         LZ4F_errorCode_t r;
334         rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
335         size_t len = rd_slice_remains(slice);
336         size_t out_sz;
337         size_t out_of = 0;
338         char *out;
339         const void *p;
340         size_t rlen;
341 
342         /* Required by Kafka */
343         const LZ4F_preferences_t prefs =
344                 {
345                         .frameInfo = { .blockMode = LZ4F_blockIndependent },
346                         .compressionLevel = comp_level
347                 };
348 
349         *outbuf = NULL;
350 
351         out_sz = LZ4F_compressBound(len, NULL) + 1000;
352         if (LZ4F_isError(out_sz)) {
353                 rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
354                            "Unable to query LZ4 compressed size "
355                            "(for %"PRIusz" uncompressed bytes): %s",
356                            len, LZ4F_getErrorName(out_sz));
357                 return RD_KAFKA_RESP_ERR__BAD_MSG;
358         }
359 
360         out = rd_malloc(out_sz);
361         if (!out) {
362                 rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
363                            "Unable to allocate output buffer "
364                            "(%"PRIusz" bytes): %s",
365                            out_sz, rd_strerror(errno));
366                 return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
367         }
368 
369         r = LZ4F_createCompressionContext(&cctx, LZ4F_VERSION);
370         if (LZ4F_isError(r)) {
371                 rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
372                            "Unable to create LZ4 compression context: %s",
373                            LZ4F_getErrorName(r));
374                 rd_free(out);
375                 return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
376         }
377 
378         r = LZ4F_compressBegin(cctx, out, out_sz, &prefs);
379         if (LZ4F_isError(r)) {
380                 rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
381                            "Unable to begin LZ4 compression "
382                            "(out buffer is %"PRIusz" bytes): %s",
383                            out_sz, LZ4F_getErrorName(r));
384                 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
385                 goto done;
386         }
387 
388         out_of += r;
389 
390         while ((rlen = rd_slice_reader(slice, &p))) {
391                 rd_assert(out_of < out_sz);
392                 r = LZ4F_compressUpdate(cctx, out+out_of, out_sz-out_of,
393                                         p, rlen, NULL);
394                 if (unlikely(LZ4F_isError(r))) {
395                         rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
396                                    "LZ4 compression failed "
397                                    "(at of %"PRIusz" bytes, with "
398                                    "%"PRIusz" bytes remaining in out buffer): "
399                                    "%s",
400                                    rlen, out_sz - out_of,
401                                    LZ4F_getErrorName(r));
402                         err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
403                         goto done;
404                 }
405 
406                 out_of += r;
407         }
408 
409         rd_assert(rd_slice_remains(slice) == 0);
410 
411         r = LZ4F_compressEnd(cctx, out+out_of, out_sz-out_of, NULL);
412         if (unlikely(LZ4F_isError(r))) {
413                 rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
414                            "Failed to finalize LZ4 compression "
415                            "of %"PRIusz" bytes: %s",
416                            len, LZ4F_getErrorName(r));
417                 err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
418                 goto done;
419         }
420 
421         out_of += r;
422 
423         /* For the broken legacy framing we need to mess up the header checksum
424          * so that the Kafka client / broker code accepts it. */
425         if (!proper_hc)
426                 if ((err = rd_kafka_lz4_compress_break_framing(rkb,
427                                                                out, out_of)))
428                         goto done;
429 
430 
431         *outbuf  = out;
432         *outlenp = out_of;
433 
434  done:
435         LZ4F_freeCompressionContext(cctx);
436 
437         if (err)
438                 rd_free(out);
439 
440         return err;
441 
442 }
443