1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2020, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 #include "../src/rdkafka_proto.h"
32 
33 
34 /**
35  * @name Verify that producer and consumer resumes operation after
36  *       a topic has been deleted and recreated.
37  */
38 
39 /**
40  * The message value to produce, one of:
41  *   "before"  - before topic deletion
42  *   "during"  - during topic deletion
43  *   "after"   - after topic has been re-created
44  *   "end"     - stop producing
45  */
46 static mtx_t value_mtx;
47 static char *value;
48 
49 static const int msg_rate = 10;  /**< Messages produced per second */
50 
51 static struct test *this_test;   /**< Exposes current test struct (in TLS) to
52                                   *   producer thread. */
53 
54 
55 /**
56  * @brief Treat all error_cb as non-test-fatal.
57  */
is_error_fatal(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)58 static int is_error_fatal (rd_kafka_t *rk, rd_kafka_resp_err_t err,
59                            const char *reason) {
60         return rd_false;
61 }
62 
63 /**
64  * @brief Producing thread
65  */
run_producer(void * arg)66 static int run_producer (void *arg) {
67         const char *topic = arg;
68         rd_kafka_t *producer = test_create_producer();
69         int ret = 0;
70 
71         test_curr = this_test;
72 
73         /* Don't check message status */
74         test_curr->exp_dr_status = (rd_kafka_msg_status_t)-1;
75 
76         while (1) {
77                 rd_kafka_resp_err_t err;
78 
79                 mtx_lock(&value_mtx);
80                 if (!strcmp(value, "end")) {
81                         mtx_unlock(&value_mtx);
82                         break;
83                 } else if (strcmp(value, "before")) {
84                         /* Ignore Delivery report errors after topic
85                          * has been deleted and eventually re-created,
86                          * we rely on the consumer to verify that
87                          * messages are produced. */
88                         test_curr->ignore_dr_err = rd_true;
89                 }
90 
91                 err = rd_kafka_producev(
92                         producer,
93                         RD_KAFKA_V_TOPIC(topic),
94                         RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
95                         RD_KAFKA_V_VALUE(value, strlen(value)),
96                         RD_KAFKA_V_END);
97 
98                 if (err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART ||
99                     err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
100                         TEST_SAY("Produce failed (expectedly): %s\n",
101                                  rd_kafka_err2name(err));
102                 else
103                         TEST_ASSERT(!err, "producev() failed: %s",
104                                     rd_kafka_err2name(err));
105 
106                 mtx_unlock(&value_mtx);
107 
108                 rd_usleep(1000000 / msg_rate, NULL);
109 
110                 rd_kafka_poll(producer, 0);
111         }
112 
113         if (rd_kafka_flush(producer, 5000)) {
114                 TEST_WARN("Failed to flush all message(s), %d remain\n",
115                           rd_kafka_outq_len(producer));
116                 /* Purge the messages to see which partition they were for */
117                 rd_kafka_purge(producer,
118                                RD_KAFKA_PURGE_F_QUEUE|
119                                RD_KAFKA_PURGE_F_INFLIGHT);
120                 rd_kafka_flush(producer, 5000);
121                 TEST_SAY("%d message(s) in queue after purge\n",
122                          rd_kafka_outq_len(producer));
123 
124                 ret = 1; /* Fail test from main thread */
125         }
126 
127         rd_kafka_destroy(producer);
128 
129         return ret;
130 }
131 
132 
133 /**
134  * @brief Expect at least \p cnt messages with value matching \p exp_value,
135  *        else fail the current test.
136  */
expect_messages(rd_kafka_t * consumer,int cnt,const char * exp_value)137 static void expect_messages (rd_kafka_t *consumer, int cnt,
138                              const char *exp_value) {
139         int match_cnt = 0, other_cnt = 0, err_cnt = 0;
140         size_t exp_len = strlen(exp_value);
141 
142         TEST_SAY("Expecting >= %d messages with value \"%s\"...\n",
143                  cnt, exp_value);
144 
145         while (match_cnt < cnt) {
146                 rd_kafka_message_t *rkmessage;
147 
148                 rkmessage = rd_kafka_consumer_poll(consumer, 1000);
149                 if (!rkmessage)
150                         continue;
151 
152                 if (rkmessage->err) {
153                         TEST_SAY("Consume error: %s\n",
154                                  rd_kafka_message_errstr(rkmessage));
155                         err_cnt++;
156                 } else if (rkmessage->len == exp_len &&
157                            !memcmp(rkmessage->payload, exp_value, exp_len)) {
158                         match_cnt++;
159                 } else {
160                         TEST_SAYL(3, "Received \"%.*s\", expected \"%s\": "
161                                   "ignored\n",
162                                   (int)rkmessage->len,
163                                   (const char *)rkmessage->payload,
164                                   exp_value);
165                         other_cnt++;
166                 }
167 
168                 rd_kafka_message_destroy(rkmessage);
169         }
170 
171         TEST_SAY("Consumed %d messages matching \"%s\", "
172                  "ignored %d others, saw %d error(s)\n",
173                  match_cnt, exp_value, other_cnt, err_cnt);
174 }
175 
176 
177 /**
178  * @brief Test topic create + delete + create with first topic having
179  *        \p part_cnt_1 partitions and second topic having \p part_cnt_2 .
180  */
do_test_create_delete_create(int part_cnt_1,int part_cnt_2)181 static void do_test_create_delete_create (int part_cnt_1, int part_cnt_2) {
182         rd_kafka_t *consumer;
183         thrd_t producer_thread;
184         const char *topic = test_mk_topic_name(__FUNCTION__, 1);
185         int ret = 0;
186 
187         TEST_SAY(_C_MAG
188                  "[ Test topic create(%d parts)+delete+create(%d parts) ]\n",
189                  part_cnt_1, part_cnt_2);
190 
191         consumer = test_create_consumer(topic, NULL, NULL, NULL);
192 
193         /* Create topic */
194         test_create_topic(consumer, topic, part_cnt_1, 3);
195 
196         /* Start consumer */
197         test_consumer_subscribe(consumer, topic);
198         test_consumer_wait_assignment(consumer);
199 
200         mtx_lock(&value_mtx);
201         value = "before";
202         mtx_unlock(&value_mtx);
203 
204         /* Create producer thread */
205         if (thrd_create(&producer_thread, run_producer,
206                         (void *)topic) != thrd_success)
207                 TEST_FAIL("thrd_create failed");
208 
209         /* Consume messages for 5s */
210         expect_messages(consumer, msg_rate * 5, value);
211 
212         /* Delete topic */
213         mtx_lock(&value_mtx);
214         value = "during";
215         mtx_unlock(&value_mtx);
216 
217         test_delete_topic(consumer, topic);
218         rd_sleep(5);
219 
220         /* Re-create topic */
221         test_create_topic(consumer, topic, part_cnt_2, 3);
222 
223         mtx_lock(&value_mtx);
224         value = "after";
225         mtx_unlock(&value_mtx);
226 
227         /* Consume for 5 more seconds, should see new messages */
228         expect_messages(consumer, msg_rate * 5, value);
229 
230         rd_kafka_destroy(consumer);
231 
232         /* Wait for producer to exit */
233         mtx_lock(&value_mtx);
234         value = "end";
235         mtx_unlock(&value_mtx);
236 
237         if (thrd_join(producer_thread, &ret) != thrd_success || ret != 0)
238                 TEST_FAIL("Producer failed: see previous errors");
239 
240         TEST_SAY(_C_GRN
241                  "[ Test topic create(%d parts)+delete+create(%d parts): "
242                  "PASS ]\n",
243                  part_cnt_1, part_cnt_2);
244 }
245 
246 
main_0107_topic_recreate(int argc,char ** argv)247 int main_0107_topic_recreate (int argc, char **argv) {
248         this_test = test_curr; /* Need to expose current test struct (in TLS)
249                                 * to producer thread. */
250 
251         this_test->is_fatal_cb = is_error_fatal;
252 
253         mtx_init(&value_mtx, mtx_plain);
254 
255         test_conf_init(NULL, NULL, 60);
256 
257         do_test_create_delete_create(10, 3);
258         do_test_create_delete_create(3, 6);
259 
260         return 0;
261 }
262