1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2020, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 #include "../src/rdkafka_proto.h"
32
33
34 /**
35 * @name Verify that producer and consumer resumes operation after
36 * a topic has been deleted and recreated.
37 */
38
39 /**
40 * The message value to produce, one of:
41 * "before" - before topic deletion
42 * "during" - during topic deletion
43 * "after" - after topic has been re-created
44 * "end" - stop producing
45 */
46 static mtx_t value_mtx;
47 static char *value;
48
49 static const int msg_rate = 10; /**< Messages produced per second */
50
51 static struct test *this_test; /**< Exposes current test struct (in TLS) to
52 * producer thread. */
53
54
55 /**
56 * @brief Treat all error_cb as non-test-fatal.
57 */
is_error_fatal(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)58 static int is_error_fatal (rd_kafka_t *rk, rd_kafka_resp_err_t err,
59 const char *reason) {
60 return rd_false;
61 }
62
63 /**
64 * @brief Producing thread
65 */
run_producer(void * arg)66 static int run_producer (void *arg) {
67 const char *topic = arg;
68 rd_kafka_t *producer = test_create_producer();
69 int ret = 0;
70
71 test_curr = this_test;
72
73 /* Don't check message status */
74 test_curr->exp_dr_status = (rd_kafka_msg_status_t)-1;
75
76 while (1) {
77 rd_kafka_resp_err_t err;
78
79 mtx_lock(&value_mtx);
80 if (!strcmp(value, "end")) {
81 mtx_unlock(&value_mtx);
82 break;
83 } else if (strcmp(value, "before")) {
84 /* Ignore Delivery report errors after topic
85 * has been deleted and eventually re-created,
86 * we rely on the consumer to verify that
87 * messages are produced. */
88 test_curr->ignore_dr_err = rd_true;
89 }
90
91 err = rd_kafka_producev(
92 producer,
93 RD_KAFKA_V_TOPIC(topic),
94 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
95 RD_KAFKA_V_VALUE(value, strlen(value)),
96 RD_KAFKA_V_END);
97
98 if (err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART ||
99 err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
100 TEST_SAY("Produce failed (expectedly): %s\n",
101 rd_kafka_err2name(err));
102 else
103 TEST_ASSERT(!err, "producev() failed: %s",
104 rd_kafka_err2name(err));
105
106 mtx_unlock(&value_mtx);
107
108 rd_usleep(1000000 / msg_rate, NULL);
109
110 rd_kafka_poll(producer, 0);
111 }
112
113 if (rd_kafka_flush(producer, 5000)) {
114 TEST_WARN("Failed to flush all message(s), %d remain\n",
115 rd_kafka_outq_len(producer));
116 /* Purge the messages to see which partition they were for */
117 rd_kafka_purge(producer,
118 RD_KAFKA_PURGE_F_QUEUE|
119 RD_KAFKA_PURGE_F_INFLIGHT);
120 rd_kafka_flush(producer, 5000);
121 TEST_SAY("%d message(s) in queue after purge\n",
122 rd_kafka_outq_len(producer));
123
124 ret = 1; /* Fail test from main thread */
125 }
126
127 rd_kafka_destroy(producer);
128
129 return ret;
130 }
131
132
133 /**
134 * @brief Expect at least \p cnt messages with value matching \p exp_value,
135 * else fail the current test.
136 */
expect_messages(rd_kafka_t * consumer,int cnt,const char * exp_value)137 static void expect_messages (rd_kafka_t *consumer, int cnt,
138 const char *exp_value) {
139 int match_cnt = 0, other_cnt = 0, err_cnt = 0;
140 size_t exp_len = strlen(exp_value);
141
142 TEST_SAY("Expecting >= %d messages with value \"%s\"...\n",
143 cnt, exp_value);
144
145 while (match_cnt < cnt) {
146 rd_kafka_message_t *rkmessage;
147
148 rkmessage = rd_kafka_consumer_poll(consumer, 1000);
149 if (!rkmessage)
150 continue;
151
152 if (rkmessage->err) {
153 TEST_SAY("Consume error: %s\n",
154 rd_kafka_message_errstr(rkmessage));
155 err_cnt++;
156 } else if (rkmessage->len == exp_len &&
157 !memcmp(rkmessage->payload, exp_value, exp_len)) {
158 match_cnt++;
159 } else {
160 TEST_SAYL(3, "Received \"%.*s\", expected \"%s\": "
161 "ignored\n",
162 (int)rkmessage->len,
163 (const char *)rkmessage->payload,
164 exp_value);
165 other_cnt++;
166 }
167
168 rd_kafka_message_destroy(rkmessage);
169 }
170
171 TEST_SAY("Consumed %d messages matching \"%s\", "
172 "ignored %d others, saw %d error(s)\n",
173 match_cnt, exp_value, other_cnt, err_cnt);
174 }
175
176
177 /**
178 * @brief Test topic create + delete + create with first topic having
179 * \p part_cnt_1 partitions and second topic having \p part_cnt_2 .
180 */
do_test_create_delete_create(int part_cnt_1,int part_cnt_2)181 static void do_test_create_delete_create (int part_cnt_1, int part_cnt_2) {
182 rd_kafka_t *consumer;
183 thrd_t producer_thread;
184 const char *topic = test_mk_topic_name(__FUNCTION__, 1);
185 int ret = 0;
186
187 TEST_SAY(_C_MAG
188 "[ Test topic create(%d parts)+delete+create(%d parts) ]\n",
189 part_cnt_1, part_cnt_2);
190
191 consumer = test_create_consumer(topic, NULL, NULL, NULL);
192
193 /* Create topic */
194 test_create_topic(consumer, topic, part_cnt_1, 3);
195
196 /* Start consumer */
197 test_consumer_subscribe(consumer, topic);
198 test_consumer_wait_assignment(consumer);
199
200 mtx_lock(&value_mtx);
201 value = "before";
202 mtx_unlock(&value_mtx);
203
204 /* Create producer thread */
205 if (thrd_create(&producer_thread, run_producer,
206 (void *)topic) != thrd_success)
207 TEST_FAIL("thrd_create failed");
208
209 /* Consume messages for 5s */
210 expect_messages(consumer, msg_rate * 5, value);
211
212 /* Delete topic */
213 mtx_lock(&value_mtx);
214 value = "during";
215 mtx_unlock(&value_mtx);
216
217 test_delete_topic(consumer, topic);
218 rd_sleep(5);
219
220 /* Re-create topic */
221 test_create_topic(consumer, topic, part_cnt_2, 3);
222
223 mtx_lock(&value_mtx);
224 value = "after";
225 mtx_unlock(&value_mtx);
226
227 /* Consume for 5 more seconds, should see new messages */
228 expect_messages(consumer, msg_rate * 5, value);
229
230 rd_kafka_destroy(consumer);
231
232 /* Wait for producer to exit */
233 mtx_lock(&value_mtx);
234 value = "end";
235 mtx_unlock(&value_mtx);
236
237 if (thrd_join(producer_thread, &ret) != thrd_success || ret != 0)
238 TEST_FAIL("Producer failed: see previous errors");
239
240 TEST_SAY(_C_GRN
241 "[ Test topic create(%d parts)+delete+create(%d parts): "
242 "PASS ]\n",
243 part_cnt_1, part_cnt_2);
244 }
245
246
main_0107_topic_recreate(int argc,char ** argv)247 int main_0107_topic_recreate (int argc, char **argv) {
248 this_test = test_curr; /* Need to expose current test struct (in TLS)
249 * to producer thread. */
250
251 this_test->is_fatal_cb = is_error_fatal;
252
253 mtx_init(&value_mtx, mtx_plain);
254
255 test_conf_init(NULL, NULL, 60);
256
257 do_test_create_delete_create(10, 3);
258 do_test_create_delete_create(3, 6);
259
260 return 0;
261 }
262