1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2012-2015, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30 #include "rdkafka.h"
31
32 /**
33 * @brief Verify handling of compacted topics.
34 *
35 * General idea:
36 * - create a compacted topic with a low cleanup interval to promote quick
37 * compaction.
38 * - produce messages for 3 keys and interleave with unkeyed messages.
39 * interleave tombstones for k1 and k2, but not k3.
40 * - consume before compaction - verify all messages in place
41 * - wait for compaction
42 * - consume after compaction - verify expected messages.
43 */
44
45
46
47 /**
48 * @brief Get low watermark in partition, we use this see if compaction
49 * has kicked in.
50 */
get_low_wmark(rd_kafka_t * rk,const char * topic,int32_t partition)51 static int64_t get_low_wmark (rd_kafka_t *rk, const char *topic,
52 int32_t partition) {
53 rd_kafka_resp_err_t err;
54 int64_t low, high;
55
56 err = rd_kafka_query_watermark_offsets(rk, topic, partition,
57 &low, &high,
58 tmout_multip(10000));
59
60 TEST_ASSERT(!err, "query_warmark_offsets(%s, %d) failed: %s",
61 topic, (int)partition, rd_kafka_err2str(err));
62
63 return low;
64 }
65
66
67 /**
68 * @brief Wait for compaction by checking for
69 * partition low-watermark increasing */
wait_compaction(rd_kafka_t * rk,const char * topic,int32_t partition,int64_t low_offset,int timeout_ms)70 static void wait_compaction (rd_kafka_t *rk,
71 const char *topic, int32_t partition,
72 int64_t low_offset,
73 int timeout_ms) {
74 int64_t low = -1;
75 int64_t ts_start = test_clock();
76
77 TEST_SAY("Waiting for compaction to kick in and increase the "
78 "Low watermark offset from %"PRId64" on %s [%"PRId32"]\n",
79 low_offset, topic, partition);
80
81 while (1) {
82 low = get_low_wmark(rk, topic, partition);
83
84 TEST_SAY("Low watermark offset for %s [%"PRId32"] is "
85 "%"PRId64" (want > %"PRId64")\n",
86 topic, partition, low, low_offset);
87
88 if (low > low_offset)
89 break;
90
91 if (ts_start + (timeout_ms * 1000) < test_clock())
92 break;
93
94 rd_sleep(5);
95 }
96 }
97
produce_compactable_msgs(const char * topic,int32_t partition,uint64_t testid,int msgcnt,size_t msgsize)98 static void produce_compactable_msgs (const char *topic, int32_t partition,
99 uint64_t testid,
100 int msgcnt, size_t msgsize) {
101 rd_kafka_t *rk;
102 rd_kafka_conf_t *conf;
103 int i;
104 char *val;
105 char key[16];
106 rd_kafka_resp_err_t err;
107 int msgcounter = msgcnt;
108
109 if (!testid)
110 testid = test_id_generate();
111
112 test_str_id_generate(key, sizeof(key));
113
114 val = calloc(1, msgsize);
115
116 TEST_SAY("Producing %d messages (total of %"PRIusz" bytes) of "
117 "compactable messages\n", msgcnt, (size_t)msgcnt*msgsize);
118
119 test_conf_init(&conf, NULL, 0);
120 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
121 /* Make sure batch size does not exceed segment.bytes since that
122 * will make the ProduceRequest fail. */
123 test_conf_set(conf, "batch.num.messages", "1");
124
125 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
126
127 for (i = 0 ; i < msgcnt-1 ; i++) {
128 err = rd_kafka_producev(rk,
129 RD_KAFKA_V_TOPIC(topic),
130 RD_KAFKA_V_PARTITION(partition),
131 RD_KAFKA_V_KEY(key, sizeof(key)-1),
132 RD_KAFKA_V_VALUE(val, msgsize),
133 RD_KAFKA_V_OPAQUE(&msgcounter),
134 RD_KAFKA_V_END);
135 TEST_ASSERT(!err, "producev(): %s", rd_kafka_err2str(err));
136 }
137
138 /* Final message is the tombstone */
139 err = rd_kafka_producev(rk,
140 RD_KAFKA_V_TOPIC(topic),
141 RD_KAFKA_V_PARTITION(partition),
142 RD_KAFKA_V_KEY(key, sizeof(key)-1),
143 RD_KAFKA_V_OPAQUE(&msgcounter),
144 RD_KAFKA_V_END);
145 TEST_ASSERT(!err, "producev(): %s", rd_kafka_err2str(err));
146
147 test_flush(rk, tmout_multip(10000));
148 TEST_ASSERT(msgcounter == 0, "%d messages unaccounted for", msgcounter);
149
150 rd_kafka_destroy(rk);
151
152 free(val);
153 }
154
155
156
do_test_compaction(int msgs_per_key,const char * compression)157 static void do_test_compaction (int msgs_per_key, const char *compression) {
158 const char *topic = test_mk_topic_name(__FILE__, 1);
159 #define _KEY_CNT 4
160 const char *keys[_KEY_CNT] = { "k1", "k2", "k3", NULL/*generate unique*/ };
161 int msgcnt = msgs_per_key * _KEY_CNT;
162 rd_kafka_conf_t *conf;
163 rd_kafka_t *rk;
164 rd_kafka_topic_t *rkt;
165 uint64_t testid;
166 int32_t partition = 0;
167 int cnt = 0;
168 test_msgver_t mv;
169 test_msgver_t mv_correct;
170 int msgcounter = 0;
171 const int fillcnt = 20;
172
173 testid = test_id_generate();
174
175 TEST_SAY(_C_MAG "Test compaction on topic %s with %s compression (%d messages)\n",
176 topic, compression ? compression : "no", msgcnt);
177
178 test_kafka_topics("--create --topic \"%s\" "
179 "--partitions %d "
180 "--replication-factor 1 "
181 "--config cleanup.policy=compact "
182 "--config segment.ms=10000 "
183 "--config segment.bytes=10000 "
184 "--config min.cleanable.dirty.ratio=0.01 "
185 "--config delete.retention.ms=86400 "
186 "--config file.delete.delay.ms=10000",
187 topic, partition+1);
188
189 test_conf_init(&conf, NULL, 120);
190 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
191 if (compression)
192 test_conf_set(conf, "compression.codec", compression);
193 /* Limit max batch size below segment.bytes to avoid messages
194 * to accumulate into a batch that will be rejected by the broker. */
195 test_conf_set(conf, "message.max.bytes", "6000");
196 test_conf_set(conf, "linger.ms", "10");
197 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
198 rkt = rd_kafka_topic_new(rk, topic, NULL);
199
200 /* The low watermark is not updated on message deletion(compaction)
201 * but on segment deletion, so fill up the first segment with
202 * random messages eligible for hasty compaction. */
203 produce_compactable_msgs(topic, 0, partition, fillcnt, 1000);
204
205 /* Populate a correct msgver for later comparison after compact. */
206 test_msgver_init(&mv_correct, testid);
207
208 TEST_SAY("Producing %d messages for %d keys\n", msgcnt, _KEY_CNT);
209 for (cnt = 0 ; cnt < msgcnt ; ) {
210 int k;
211
212 for (k = 0 ; k < _KEY_CNT ; k++) {
213 rd_kafka_resp_err_t err;
214 int is_last = cnt + _KEY_CNT >= msgcnt;
215 /* Let keys[0] have some tombstones */
216 int is_tombstone = (k == 0 && (is_last || !(cnt % 7)));
217 char *valp;
218 size_t valsize;
219 char rdk_msgid[256];
220 char unique_key[16];
221 const void *key;
222 size_t keysize;
223 int64_t offset = fillcnt + cnt;
224
225 test_msg_fmt(rdk_msgid, sizeof(rdk_msgid),
226 testid, partition, cnt);
227
228 if (is_tombstone) {
229 valp = NULL;
230 valsize = 0;
231 } else {
232 valp = rdk_msgid;
233 valsize = strlen(valp);
234 }
235
236 if (!(key = keys[k])) {
237 rd_snprintf(unique_key, sizeof(unique_key),
238 "%d", cnt);
239 key = unique_key;
240 }
241 keysize = strlen(key);
242
243 /* All unique-key messages should remain intact
244 * after compaction. */
245 if (!keys[k] || is_last) {
246 TEST_SAYL(4,
247 "Add to correct msgvec: "
248 "msgid: %d: %s is_last=%d, "
249 "is_tomb=%d\n",
250 cnt, (const char *)key,
251 is_last, is_tombstone);
252 test_msgver_add_msg00(__FUNCTION__, __LINE__,
253 rd_kafka_name(rk),
254 &mv_correct, testid,
255 topic, partition,
256 offset, -1, 0, cnt);
257 }
258
259
260 msgcounter++;
261 err = rd_kafka_producev(
262 rk,
263 RD_KAFKA_V_TOPIC(topic),
264 RD_KAFKA_V_PARTITION(0),
265 RD_KAFKA_V_KEY(key, keysize),
266 RD_KAFKA_V_VALUE(valp, valsize),
267 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
268 RD_KAFKA_V_HEADER("rdk_msgid", rdk_msgid, -1),
269 /* msgcounter as msg_opaque is used
270 * by test delivery report callback to
271 * count number of messages. */
272 RD_KAFKA_V_OPAQUE(&msgcounter),
273 RD_KAFKA_V_END);
274 TEST_ASSERT(!err, "producev(#%d) failed: %s",
275 cnt, rd_kafka_err2str(err));
276
277 cnt++;
278 }
279 }
280
281 TEST_ASSERT(cnt == msgcnt, "cnt %d != msgcnt %d", cnt, msgcnt);
282
283 msgcounter = cnt;
284 test_wait_delivery(rk, &msgcounter);
285
286 /* Trigger compaction by filling up the segment with dummy messages,
287 * do it in chunks to avoid too good compression which then won't
288 * fill up the segments..
289 * We can't reuse the existing producer instance because it
290 * might be using compression which makes it hard to know how
291 * much data we need to produce to trigger compaction. */
292 produce_compactable_msgs(topic, 0, partition, 20, 1024);
293
294 /* Wait for compaction:
295 * this doesn't really work because the low watermark offset
296 * is not updated on compaction if the first segment is not deleted.
297 * But it serves as a pause to let compaction kick in
298 * which is triggered by the dummy produce above. */
299 wait_compaction(rk, topic, partition, 0, 20*1000);
300
301 TEST_SAY(_C_YEL "Verify messages after compaction\n");
302 /* After compaction we expect the following messages:
303 * last message for each of k1, k2, k3, all messages for unkeyed. */
304 test_msgver_init(&mv, testid);
305 mv.msgid_hdr = "rdk_msgid";
306 test_consume_msgs_easy_mv(NULL, topic, -1, testid, 1, -1, NULL, &mv);
307 test_msgver_verify_compare("post-compaction", &mv, &mv_correct,
308 TEST_MSGVER_BY_MSGID|TEST_MSGVER_BY_OFFSET);
309 test_msgver_clear(&mv);
310
311 test_msgver_clear(&mv_correct);
312
313 rd_kafka_topic_destroy(rkt);
314 rd_kafka_destroy(rk);
315
316 TEST_SAY(_C_GRN "Compaction test with %s compression: PASS\n",
317 compression ? compression : "no");
318 }
319
main_0077_compaction(int argc,char ** argv)320 int main_0077_compaction (int argc, char **argv) {
321
322 if (!test_can_create_topics(1))
323 return 0;
324
325 do_test_compaction(10, NULL);
326
327 if (test_quick) {
328 TEST_SAY("Skipping further compaction tests "
329 "due to quick mode\n");
330 return 0;
331 }
332
333 do_test_compaction(1000, NULL);
334 #if WITH_SNAPPY
335 do_test_compaction(10, "snappy");
336 #endif
337 #if WITH_ZSTD
338 do_test_compaction(10, "zstd");
339 #endif
340 #if WITH_ZLIB
341 do_test_compaction(10000, "gzip");
342 #endif
343
344 return 0;
345 }
346