1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 #include "rdkafka.h"
31 
32 /**
33  * @brief Verify handling of compacted topics.
34  *
35  * General idea:
36  *  - create a compacted topic with a low cleanup interval to promote quick
37  *    compaction.
38  *  - produce messages for 3 keys and interleave with unkeyed messages.
39  *    interleave tombstones for k1 and k2, but not k3.
40  *  - consume before compaction - verify all messages in place
41  *  - wait for compaction
42  *  - consume after compaction - verify expected messages.
43  */
44 
45 
46 
47 /**
48  * @brief Get low watermark in partition, we use this see if compaction
49  *        has kicked in.
50  */
get_low_wmark(rd_kafka_t * rk,const char * topic,int32_t partition)51 static int64_t get_low_wmark (rd_kafka_t *rk, const char *topic,
52                               int32_t partition) {
53         rd_kafka_resp_err_t err;
54         int64_t low, high;
55 
56         err = rd_kafka_query_watermark_offsets(rk, topic, partition,
57                                                &low, &high,
58                                                tmout_multip(10000));
59 
60         TEST_ASSERT(!err, "query_warmark_offsets(%s, %d) failed: %s",
61                     topic, (int)partition, rd_kafka_err2str(err));
62 
63         return low;
64 }
65 
66 
67 /**
68  * @brief Wait for compaction by checking for
69  *        partition low-watermark increasing */
wait_compaction(rd_kafka_t * rk,const char * topic,int32_t partition,int64_t low_offset,int timeout_ms)70 static void wait_compaction (rd_kafka_t *rk,
71                              const char *topic, int32_t partition,
72                              int64_t low_offset,
73                              int timeout_ms) {
74         int64_t low = -1;
75         int64_t ts_start = test_clock();
76 
77         TEST_SAY("Waiting for compaction to kick in and increase the "
78                  "Low watermark offset from %"PRId64" on %s [%"PRId32"]\n",
79                  low_offset, topic, partition);
80 
81         while (1) {
82                 low = get_low_wmark(rk, topic, partition);
83 
84                 TEST_SAY("Low watermark offset for %s [%"PRId32"] is "
85                          "%"PRId64" (want > %"PRId64")\n",
86                          topic, partition, low, low_offset);
87 
88                 if (low > low_offset)
89                         break;
90 
91                 if (ts_start + (timeout_ms * 1000) < test_clock())
92                         break;
93 
94                 rd_sleep(5);
95         }
96 }
97 
produce_compactable_msgs(const char * topic,int32_t partition,uint64_t testid,int msgcnt,size_t msgsize)98 static void produce_compactable_msgs (const char *topic, int32_t partition,
99                                       uint64_t testid,
100                                       int msgcnt, size_t msgsize) {
101         rd_kafka_t *rk;
102         rd_kafka_conf_t *conf;
103         int i;
104         char *val;
105         char key[16];
106         rd_kafka_resp_err_t err;
107         int msgcounter = msgcnt;
108 
109         if (!testid)
110                 testid = test_id_generate();
111 
112         test_str_id_generate(key, sizeof(key));
113 
114         val = calloc(1, msgsize);
115 
116         TEST_SAY("Producing %d messages (total of %"PRIusz" bytes) of "
117                  "compactable messages\n", msgcnt, (size_t)msgcnt*msgsize);
118 
119         test_conf_init(&conf, NULL, 0);
120         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
121         /* Make sure batch size does not exceed segment.bytes since that
122          * will make the ProduceRequest fail. */
123         test_conf_set(conf, "batch.num.messages", "1");
124 
125         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
126 
127         for (i = 0 ; i < msgcnt-1 ; i++) {
128                 err = rd_kafka_producev(rk,
129                                         RD_KAFKA_V_TOPIC(topic),
130                                         RD_KAFKA_V_PARTITION(partition),
131                                         RD_KAFKA_V_KEY(key, sizeof(key)-1),
132                                         RD_KAFKA_V_VALUE(val, msgsize),
133                                         RD_KAFKA_V_OPAQUE(&msgcounter),
134                                         RD_KAFKA_V_END);
135                 TEST_ASSERT(!err, "producev(): %s", rd_kafka_err2str(err));
136         }
137 
138         /* Final message is the tombstone */
139         err = rd_kafka_producev(rk,
140                                 RD_KAFKA_V_TOPIC(topic),
141                                 RD_KAFKA_V_PARTITION(partition),
142                                 RD_KAFKA_V_KEY(key, sizeof(key)-1),
143                                 RD_KAFKA_V_OPAQUE(&msgcounter),
144                                 RD_KAFKA_V_END);
145         TEST_ASSERT(!err, "producev(): %s", rd_kafka_err2str(err));
146 
147         test_flush(rk, tmout_multip(10000));
148         TEST_ASSERT(msgcounter == 0, "%d messages unaccounted for", msgcounter);
149 
150         rd_kafka_destroy(rk);
151 
152         free(val);
153 }
154 
155 
156 
do_test_compaction(int msgs_per_key,const char * compression)157 static void do_test_compaction (int msgs_per_key, const char *compression) {
158         const char *topic = test_mk_topic_name(__FILE__, 1);
159 #define _KEY_CNT 4
160         const char *keys[_KEY_CNT] = { "k1", "k2", "k3", NULL/*generate unique*/ };
161         int msgcnt = msgs_per_key * _KEY_CNT;
162         rd_kafka_conf_t *conf;
163         rd_kafka_t *rk;
164         rd_kafka_topic_t *rkt;
165         uint64_t testid;
166         int32_t partition = 0;
167         int cnt = 0;
168         test_msgver_t mv;
169         test_msgver_t mv_correct;
170         int msgcounter = 0;
171         const int fillcnt = 20;
172 
173         testid = test_id_generate();
174 
175         TEST_SAY(_C_MAG "Test compaction on topic %s with %s compression (%d messages)\n",
176                  topic, compression ? compression : "no", msgcnt);
177 
178         test_kafka_topics("--create --topic \"%s\" "
179                           "--partitions %d "
180                           "--replication-factor 1 "
181                           "--config cleanup.policy=compact "
182                           "--config segment.ms=10000 "
183                           "--config segment.bytes=10000 "
184                           "--config min.cleanable.dirty.ratio=0.01 "
185                           "--config delete.retention.ms=86400 "
186                           "--config file.delete.delay.ms=10000",
187                           topic, partition+1);
188 
189         test_conf_init(&conf, NULL, 120);
190         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
191         if (compression)
192                 test_conf_set(conf, "compression.codec", compression);
193         /* Limit max batch size below segment.bytes to avoid messages
194          * to accumulate into a batch that will be rejected by the broker. */
195         test_conf_set(conf, "message.max.bytes", "6000");
196         test_conf_set(conf, "linger.ms", "10");
197         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
198         rkt = rd_kafka_topic_new(rk, topic, NULL);
199 
200         /* The low watermark is not updated on message deletion(compaction)
201          * but on segment deletion, so fill up the first segment with
202          * random messages eligible for hasty compaction. */
203         produce_compactable_msgs(topic, 0, partition, fillcnt, 1000);
204 
205         /* Populate a correct msgver for later comparison after compact. */
206         test_msgver_init(&mv_correct, testid);
207 
208         TEST_SAY("Producing %d messages for %d keys\n", msgcnt, _KEY_CNT);
209         for (cnt = 0 ; cnt < msgcnt ; ) {
210                 int k;
211 
212                 for (k = 0 ; k < _KEY_CNT ; k++) {
213                         rd_kafka_resp_err_t err;
214                         int is_last = cnt + _KEY_CNT >= msgcnt;
215                         /* Let keys[0] have some tombstones */
216                         int is_tombstone = (k == 0 && (is_last || !(cnt % 7)));
217                         char *valp;
218                         size_t valsize;
219                         char rdk_msgid[256];
220                         char unique_key[16];
221                         const void *key;
222                         size_t keysize;
223                         int64_t offset = fillcnt + cnt;
224 
225                         test_msg_fmt(rdk_msgid, sizeof(rdk_msgid),
226                                      testid, partition, cnt);
227 
228                         if (is_tombstone) {
229                                 valp = NULL;
230                                 valsize = 0;
231                         } else {
232                                 valp = rdk_msgid;
233                                 valsize = strlen(valp);
234                         }
235 
236                         if (!(key = keys[k])) {
237                                 rd_snprintf(unique_key, sizeof(unique_key),
238                                             "%d", cnt);
239                                 key = unique_key;
240                         }
241                         keysize = strlen(key);
242 
243                         /* All unique-key messages should remain intact
244                          * after compaction. */
245                         if (!keys[k] || is_last) {
246                                 TEST_SAYL(4,
247                                           "Add to correct msgvec: "
248                                           "msgid: %d: %s is_last=%d, "
249                                           "is_tomb=%d\n",
250                                           cnt, (const char *)key,
251                                           is_last, is_tombstone);
252                                 test_msgver_add_msg00(__FUNCTION__, __LINE__,
253                                                       rd_kafka_name(rk),
254                                                       &mv_correct, testid,
255                                                       topic, partition,
256                                                       offset,  -1, 0, cnt);
257                         }
258 
259 
260                         msgcounter++;
261                         err = rd_kafka_producev(
262                                 rk,
263                                 RD_KAFKA_V_TOPIC(topic),
264                                 RD_KAFKA_V_PARTITION(0),
265                                 RD_KAFKA_V_KEY(key, keysize),
266                                 RD_KAFKA_V_VALUE(valp, valsize),
267                                 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
268                                 RD_KAFKA_V_HEADER("rdk_msgid", rdk_msgid, -1),
269                                 /* msgcounter as msg_opaque is used
270                                  * by test delivery report callback to
271                                  * count number of messages. */
272                                 RD_KAFKA_V_OPAQUE(&msgcounter),
273                                 RD_KAFKA_V_END);
274                         TEST_ASSERT(!err, "producev(#%d) failed: %s",
275                                     cnt, rd_kafka_err2str(err));
276 
277                         cnt++;
278                 }
279         }
280 
281         TEST_ASSERT(cnt == msgcnt, "cnt %d != msgcnt %d", cnt, msgcnt);
282 
283         msgcounter = cnt;
284         test_wait_delivery(rk, &msgcounter);
285 
286         /* Trigger compaction by filling up the segment with dummy messages,
287          * do it in chunks to avoid too good compression which then won't
288          * fill up the segments..
289          * We can't reuse the existing producer instance because it
290          * might be using compression which makes it hard to know how
291          * much data we need to produce to trigger compaction. */
292         produce_compactable_msgs(topic, 0, partition, 20, 1024);
293 
294         /* Wait for compaction:
295          * this doesn't really work because the low watermark offset
296          * is not updated on compaction if the first segment is not deleted.
297          * But it serves as a pause to let compaction kick in
298          * which is triggered by the dummy produce above. */
299         wait_compaction(rk, topic, partition, 0, 20*1000);
300 
301         TEST_SAY(_C_YEL "Verify messages after compaction\n");
302         /* After compaction we expect the following messages:
303          * last message for each of k1, k2, k3, all messages for unkeyed. */
304         test_msgver_init(&mv, testid);
305         mv.msgid_hdr = "rdk_msgid";
306         test_consume_msgs_easy_mv(NULL, topic, -1, testid, 1, -1, NULL, &mv);
307         test_msgver_verify_compare("post-compaction", &mv, &mv_correct,
308                                    TEST_MSGVER_BY_MSGID|TEST_MSGVER_BY_OFFSET);
309         test_msgver_clear(&mv);
310 
311         test_msgver_clear(&mv_correct);
312 
313         rd_kafka_topic_destroy(rkt);
314         rd_kafka_destroy(rk);
315 
316         TEST_SAY(_C_GRN "Compaction test with %s compression: PASS\n",
317                  compression ? compression : "no");
318 }
319 
main_0077_compaction(int argc,char ** argv)320 int main_0077_compaction (int argc, char **argv) {
321 
322         if (!test_can_create_topics(1))
323                 return 0;
324 
325         do_test_compaction(10, NULL);
326 
327         if (test_quick) {
328                 TEST_SAY("Skipping further compaction tests "
329                          "due to quick mode\n");
330                 return 0;
331         }
332 
333         do_test_compaction(1000, NULL);
334 #if WITH_SNAPPY
335         do_test_compaction(10, "snappy");
336 #endif
337 #if WITH_ZSTD
338         do_test_compaction(10, "zstd");
339 #endif
340 #if WITH_ZLIB
341         do_test_compaction(10000, "gzip");
342 #endif
343 
344         return 0;
345 }
346