1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2019, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 #include "rdkafka.h"
32
33 /**
34 * @name Producer transaction tests
35 *
36 */
37
38
39 /**
40 * @brief Produce messages using batch interface.
41 */
do_produce_batch(rd_kafka_t * rk,const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt)42 void do_produce_batch (rd_kafka_t *rk, const char *topic, uint64_t testid,
43 int32_t partition, int msg_base, int cnt) {
44 rd_kafka_message_t *messages;
45 rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL);
46 int i;
47 int ret;
48 int remains = cnt;
49
50 TEST_SAY("Batch-producing %d messages to partition %"PRId32"\n",
51 cnt, partition);
52
53 messages = rd_calloc(sizeof(*messages), cnt);
54 for (i = 0 ; i < cnt ; i++) {
55 char key[128];
56 char value[128];
57
58 test_prepare_msg(testid, partition, msg_base + i,
59 value, sizeof(value),
60 key, sizeof(key));
61 messages[i].key = rd_strdup(key);
62 messages[i].key_len = strlen(key);
63 messages[i].payload = rd_strdup(value);
64 messages[i].len = strlen(value);
65 messages[i]._private = &remains;
66 }
67
68 ret = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_COPY,
69 messages, cnt);
70
71 rd_kafka_topic_destroy(rkt);
72
73 TEST_ASSERT(ret == cnt,
74 "Failed to batch-produce: %d/%d messages produced",
75 ret, cnt);
76
77 for (i = 0 ; i < cnt ; i++) {
78 TEST_ASSERT(!messages[i].err,
79 "Failed to produce message: %s",
80 rd_kafka_err2str(messages[i].err));
81 rd_free(messages[i].key);
82 rd_free(messages[i].payload);
83 }
84 rd_free(messages);
85
86 /* Wait for deliveries */
87 test_wait_delivery(rk, &remains);
88 }
89
90
91
92 /**
93 * @brief Basic producer transaction testing without consumed input
94 * (only consumed output for verification).
95 * e.g., no consumer offsets to commit with transaction.
96 */
do_test_basic_producer_txn(rd_bool_t enable_compression)97 static void do_test_basic_producer_txn (rd_bool_t enable_compression) {
98 const char *topic = test_mk_topic_name("0103_transactions", 1);
99 const int partition_cnt = 4;
100 #define _TXNCNT 6
101 struct {
102 const char *desc;
103 uint64_t testid;
104 int msgcnt;
105 rd_bool_t abort;
106 rd_bool_t sync;
107 rd_bool_t batch;
108 rd_bool_t batch_any;
109 } txn[_TXNCNT] = {
110 { "Commit transaction, sync producing",
111 0, 100, rd_false, rd_true },
112 { "Commit transaction, async producing",
113 0, 1000, rd_false, rd_false },
114 { "Commit transaction, sync batch producing to any partition",
115 0, 100, rd_false, rd_true, rd_true, rd_true },
116 { "Abort transaction, sync producing",
117 0, 500, rd_true, rd_true },
118 { "Abort transaction, async producing",
119 0, 5000, rd_true, rd_false },
120 { "Abort transaction, sync batch producing to one partition",
121 0, 500, rd_true, rd_true, rd_true, rd_false },
122
123 };
124 rd_kafka_t *p, *c;
125 rd_kafka_conf_t *conf, *p_conf, *c_conf;
126 int i;
127
128 /* Mark one of run modes as quick so we don't run both when
129 * in a hurry.*/
130 SUB_TEST0(enable_compression /* quick */,
131 "with%s compression", enable_compression ? "" : "out");
132
133 test_conf_init(&conf, NULL, 30);
134
135 /* Create producer */
136 p_conf = rd_kafka_conf_dup(conf);
137 rd_kafka_conf_set_dr_msg_cb(p_conf, test_dr_msg_cb);
138 test_conf_set(p_conf, "transactional.id", topic);
139 if (enable_compression)
140 test_conf_set(p_conf, "compression.type", "lz4");
141 p = test_create_handle(RD_KAFKA_PRODUCER, p_conf);
142
143 // FIXME: add testing were the txn id is reused (and thus fails)
144
145 /* Create topic */
146 test_create_topic(p, topic, partition_cnt, 3);
147
148 /* Create consumer */
149 c_conf = conf;
150 test_conf_set(conf, "auto.offset.reset", "earliest");
151 /* Make sure default isolation.level is transaction aware */
152 TEST_ASSERT(!strcmp(test_conf_get(c_conf, "isolation.level"),
153 "read_committed"),
154 "expected isolation.level=read_committed, not %s",
155 test_conf_get(c_conf, "isolation.level"));
156
157 c = test_create_consumer(topic, NULL, c_conf, NULL);
158
159 /* Wait for topic to propagate to avoid test flakyness */
160 test_wait_topic_exists(c, topic, tmout_multip(5000));
161
162 /* Subscribe to topic */
163 test_consumer_subscribe(c, topic);
164
165 /* Wait for assignment to make sure consumer is fetching messages
166 * below, so we can use the poll_no_msgs() timeout to
167 * determine that messages were indeed aborted. */
168 test_consumer_wait_assignment(c, rd_true);
169
170 /* Init transactions */
171 TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30*1000));
172
173 for (i = 0 ; i < _TXNCNT ; i++) {
174 int wait_msgcnt = 0;
175
176 TEST_SAY(_C_BLU "txn[%d]: Begin transaction: %s\n" _C_CLR,
177 i, txn[i].desc);
178
179 /* Begin a transaction */
180 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
181
182 /* If the transaction is aborted it is okay if
183 * messages fail producing, since they'll be
184 * purged from queues. */
185 test_curr->ignore_dr_err = txn[i].abort;
186
187 /* Produce messages */
188 txn[i].testid = test_id_generate();
189 TEST_SAY("txn[%d]: Produce %d messages %ssynchronously "
190 "with testid %"PRIu64"\n",
191 i, txn[i].msgcnt,
192 txn[i].sync ? "" : "a",
193 txn[i].testid);
194
195 if (!txn[i].batch) {
196 if (txn[i].sync)
197 test_produce_msgs2(p, topic, txn[i].testid,
198 RD_KAFKA_PARTITION_UA, 0,
199 txn[i].msgcnt, NULL, 0);
200 else
201 test_produce_msgs2_nowait(p, topic,
202 txn[i].testid,
203 RD_KAFKA_PARTITION_UA,
204 0,
205 txn[i].msgcnt,
206 NULL, 0,
207 &wait_msgcnt);
208 } else if (txn[i].batch_any) {
209 /* Batch: use any partition */
210 do_produce_batch(p, topic, txn[i].testid,
211 RD_KAFKA_PARTITION_UA,
212 0, txn[i].msgcnt);
213 } else {
214 /* Batch: specific partition */
215 do_produce_batch(p, topic, txn[i].testid,
216 1 /* partition */,
217 0, txn[i].msgcnt);
218 }
219
220
221 /* Abort or commit transaction */
222 TEST_SAY("txn[%d]: %s" _C_CLR " transaction\n",
223 i, txn[i].abort ? _C_RED "Abort" : _C_GRN "Commit");
224 if (txn[i].abort) {
225 test_curr->ignore_dr_err = rd_true;
226 TEST_CALL_ERROR__(rd_kafka_abort_transaction(p,
227 30*1000));
228 } else {
229 test_curr->ignore_dr_err = rd_false;
230 TEST_CALL_ERROR__(rd_kafka_commit_transaction(p,
231 30*1000));
232 }
233
234 if (!txn[i].sync)
235 /* Wait for delivery reports */
236 test_wait_delivery(p, &wait_msgcnt);
237
238 /* Consume messages */
239 if (txn[i].abort)
240 test_consumer_poll_no_msgs(txn[i].desc, c,
241 txn[i].testid, 3000);
242 else
243 test_consumer_poll(txn[i].desc, c,
244 txn[i].testid, partition_cnt, 0,
245 txn[i].msgcnt, NULL);
246
247 TEST_SAY(_C_GRN "txn[%d]: Finished successfully: %s\n" _C_CLR,
248 i, txn[i].desc);
249 }
250
251 rd_kafka_destroy(p);
252
253 test_consumer_close(c);
254 rd_kafka_destroy(c);
255
256 SUB_TEST_PASS();
257 }
258
259
260 /**
261 * @brief Consumes \p cnt messages and returns them in the provided array
262 * which must be pre-allocated.
263 */
consume_messages(rd_kafka_t * c,rd_kafka_message_t ** msgs,int msgcnt)264 static void consume_messages (rd_kafka_t *c,
265 rd_kafka_message_t **msgs, int msgcnt) {
266 int i = 0;
267 while (i < msgcnt) {
268 msgs[i] = rd_kafka_consumer_poll(c, 1000);
269 if (!msgs[i])
270 continue;
271
272 if (msgs[i]->err) {
273 TEST_SAY("%s consumer error: %s\n",
274 rd_kafka_name(c),
275 rd_kafka_message_errstr(msgs[i]));
276 rd_kafka_message_destroy(msgs[i]);
277 continue;
278 }
279
280 TEST_SAYL(3, "%s: consumed message %s [%d] @ %"PRId64"\n",
281 rd_kafka_name(c),
282 rd_kafka_topic_name(msgs[i]->rkt),
283 msgs[i]->partition, msgs[i]->offset);
284
285
286 i++;
287 }
288 }
289
destroy_messages(rd_kafka_message_t ** msgs,int msgcnt)290 static void destroy_messages (rd_kafka_message_t **msgs, int msgcnt) {
291 while (msgcnt-- > 0)
292 rd_kafka_message_destroy(msgs[msgcnt]);
293 }
294
295
296 /**
297 * @brief Test a transactional consumer + transactional producer combo,
298 * mimicing a streams job.
299 *
300 * One input topic produced to by transactional producer 1,
301 * consumed by transactional consumer 1, which forwards messages
302 * to transactional producer 2 that writes messages to output topic,
303 * which is consumed and verified by transactional consumer 2.
304 *
305 * Every 3rd transaction is aborted.
306 */
do_test_consumer_producer_txn(void)307 void do_test_consumer_producer_txn (void) {
308 char *input_topic =
309 rd_strdup(test_mk_topic_name("0103-transactions-input", 1));
310 char *output_topic =
311 rd_strdup(test_mk_topic_name("0103-transactions-output", 1));
312 const char *c1_groupid = input_topic;
313 const char *c2_groupid = output_topic;
314 rd_kafka_t *p1, *p2, *c1, *c2;
315 rd_kafka_conf_t *conf, *tmpconf, *c1_conf;
316 uint64_t testid;
317 #define _MSGCNT (10 * 30)
318 const int txncnt = 10;
319 const int msgcnt = _MSGCNT;
320 int txn;
321 int committed_msgcnt = 0;
322 test_msgver_t expect_mv, actual_mv;
323
324 SUB_TEST_QUICK("transactional test with %d transactions", txncnt);
325
326 test_conf_init(&conf, NULL, 30);
327
328 testid = test_id_generate();
329
330 /*
331 *
332 * Producer 1
333 * |
334 * v
335 * input topic
336 * |
337 * v
338 * Consumer 1 }
339 * | } transactional streams job
340 * v }
341 * Producer 2 }
342 * |
343 * v
344 * output tpic
345 * |
346 * v
347 * Consumer 2
348 */
349
350
351 /* Create Producer 1 and seed input topic */
352 tmpconf = rd_kafka_conf_dup(conf);
353 test_conf_set(tmpconf, "transactional.id", input_topic);
354 rd_kafka_conf_set_dr_msg_cb(tmpconf, test_dr_msg_cb);
355 p1 = test_create_handle(RD_KAFKA_PRODUCER, tmpconf);
356
357 /* Create input and output topics */
358 test_create_topic(p1, input_topic, 4, 3);
359 test_create_topic(p1, output_topic, 4, 3);
360
361 /* Seed input topic with messages */
362 TEST_CALL_ERROR__(rd_kafka_init_transactions(p1, 30*1000));
363 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p1));
364 test_produce_msgs2(p1, input_topic, testid, RD_KAFKA_PARTITION_UA,
365 0, msgcnt, NULL, 0);
366 TEST_CALL_ERROR__(rd_kafka_commit_transaction(p1, 30*1000));
367
368 rd_kafka_destroy(p1);
369
370 /* Create Consumer 1: reading msgs from input_topic (Producer 1) */
371 tmpconf = rd_kafka_conf_dup(conf);
372 test_conf_set(tmpconf, "isolation.level", "read_committed");
373 test_conf_set(tmpconf, "auto.offset.reset", "earliest");
374 test_conf_set(tmpconf, "enable.auto.commit", "false");
375 c1_conf = rd_kafka_conf_dup(tmpconf);
376 c1 = test_create_consumer(c1_groupid, NULL, tmpconf, NULL);
377 test_consumer_subscribe(c1, input_topic);
378
379 /* Create Producer 2 */
380 tmpconf = rd_kafka_conf_dup(conf);
381 test_conf_set(tmpconf, "transactional.id", output_topic);
382 rd_kafka_conf_set_dr_msg_cb(tmpconf, test_dr_msg_cb);
383 p2 = test_create_handle(RD_KAFKA_PRODUCER, tmpconf);
384 TEST_CALL_ERROR__(rd_kafka_init_transactions(p2, 30*1000));
385
386 /* Create Consumer 2: reading msgs from output_topic (Producer 2) */
387 tmpconf = rd_kafka_conf_dup(conf);
388 test_conf_set(tmpconf, "isolation.level", "read_committed");
389 test_conf_set(tmpconf, "auto.offset.reset", "earliest");
390 c2 = test_create_consumer(c2_groupid, NULL, tmpconf, NULL);
391 test_consumer_subscribe(c2, output_topic);
392
393 rd_kafka_conf_destroy(conf);
394
395 /* Keep track of what messages to expect on the output topic */
396 test_msgver_init(&expect_mv, testid);
397
398 for (txn = 0 ; txn < txncnt ; txn++) {
399 int msgcnt2 = 10 * (1 + (txn % 3));
400 rd_kafka_message_t *msgs[_MSGCNT];
401 int i;
402 rd_bool_t do_abort = !(txn % 3);
403 rd_bool_t recreate_consumer = do_abort && txn == 3;
404 rd_kafka_topic_partition_list_t *offsets;
405 rd_kafka_resp_err_t err;
406 rd_kafka_consumer_group_metadata_t *c1_cgmetadata;
407 int remains = msgcnt2;
408
409 TEST_SAY(_C_BLU "Begin transaction #%d/%d "
410 "(msgcnt=%d, do_abort=%s, recreate_consumer=%s)\n",
411 txn, txncnt, msgcnt2,
412 do_abort ? "true":"false",
413 recreate_consumer ? "true":"false");
414
415 consume_messages(c1, msgs, msgcnt2);
416
417 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p2));
418
419 for (i = 0 ; i < msgcnt2 ; i++) {
420 rd_kafka_message_t *msg = msgs[i];
421
422 if (!do_abort) {
423 /* The expected msgver based on the input topic
424 * will be compared to the actual msgver based
425 * on the output topic, so we need to
426 * override the topic name to match
427 * the actual msgver's output topic. */
428 test_msgver_add_msg0(__FUNCTION__, __LINE__,
429 rd_kafka_name(p2),
430 &expect_mv, msg,
431 output_topic);
432 committed_msgcnt++;
433 }
434
435 err = rd_kafka_producev(p2,
436 RD_KAFKA_V_TOPIC(output_topic),
437 RD_KAFKA_V_KEY(msg->key,
438 msg->key_len),
439 RD_KAFKA_V_VALUE(msg->payload,
440 msg->len),
441 RD_KAFKA_V_MSGFLAGS(
442 RD_KAFKA_MSG_F_COPY),
443 RD_KAFKA_V_OPAQUE(&remains),
444 RD_KAFKA_V_END);
445 TEST_ASSERT(!err, "produce failed: %s",
446 rd_kafka_err2str(err));
447
448 rd_kafka_poll(p2, 0);
449 }
450
451 destroy_messages(msgs, msgcnt2);
452
453 err = rd_kafka_assignment(c1, &offsets);
454 TEST_ASSERT(!err, "failed to get consumer assignment: %s",
455 rd_kafka_err2str(err));
456
457 err = rd_kafka_position(c1, offsets);
458 TEST_ASSERT(!err, "failed to get consumer position: %s",
459 rd_kafka_err2str(err));
460
461 c1_cgmetadata = rd_kafka_consumer_group_metadata(c1);
462 TEST_ASSERT(c1_cgmetadata != NULL,
463 "failed to get consumer group metadata");
464
465 TEST_CALL_ERROR__(
466 rd_kafka_send_offsets_to_transaction(
467 p2, offsets, c1_cgmetadata, -1));
468
469
470 rd_kafka_consumer_group_metadata_destroy(c1_cgmetadata);
471
472 rd_kafka_topic_partition_list_destroy(offsets);
473
474
475 if (do_abort) {
476 test_curr->ignore_dr_err = rd_true;
477 TEST_CALL_ERROR__(rd_kafka_abort_transaction(
478 p2, 30*1000));
479 } else {
480 test_curr->ignore_dr_err = rd_false;
481 TEST_CALL_ERROR__(rd_kafka_commit_transaction(
482 p2, 30*1000));
483 }
484
485 TEST_ASSERT(remains == 0,
486 "expected no remaining messages "
487 "in-flight/in-queue, got %d", remains);
488
489
490 if (recreate_consumer) {
491 /* Recreate the consumer to pick up
492 * on the committed offset. */
493 TEST_SAY("Recreating consumer 1\n");
494 rd_kafka_consumer_close(c1);
495 rd_kafka_destroy(c1);
496
497 c1 = test_create_consumer(c1_groupid, NULL, c1_conf,
498 NULL);
499 test_consumer_subscribe(c1, input_topic);
500 }
501 }
502
503 test_msgver_init(&actual_mv, testid);
504
505 test_consumer_poll("Verify output topic", c2, testid,
506 -1, 0, committed_msgcnt, &actual_mv);
507
508 test_msgver_verify_compare("Verify output topic",
509 &actual_mv, &expect_mv,
510 TEST_MSGVER_ALL);
511
512 test_msgver_clear(&actual_mv);
513 test_msgver_clear(&expect_mv);
514
515 rd_kafka_consumer_close(c1);
516 rd_kafka_consumer_close(c2);
517 rd_kafka_destroy(c1);
518 rd_kafka_destroy(c2);
519 rd_kafka_destroy(p2);
520
521 rd_free(input_topic);
522 rd_free(output_topic);
523
524 SUB_TEST_PASS();
525 }
526
527
528 /**
529 * @brief Testing misuse of the transaction API.
530 */
do_test_misuse_txn(void)531 static void do_test_misuse_txn (void) {
532 const char *topic = test_mk_topic_name("0103-test_misuse_txn", 1);
533 rd_kafka_t *p;
534 rd_kafka_conf_t *conf;
535 rd_kafka_error_t *error;
536 rd_kafka_resp_err_t fatal_err;
537 char errstr[512];
538 int i;
539
540 /*
541 * transaction.timeout.ms out of range (from broker's point of view)
542 */
543 SUB_TEST_QUICK();
544
545 test_conf_init(&conf, NULL, 10);
546
547 test_conf_set(conf, "transactional.id", topic);
548 test_conf_set(conf, "transaction.timeout.ms", "2147483647");
549
550 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
551
552 error = rd_kafka_init_transactions(p, 10*1000);
553 TEST_ASSERT(error, "Expected init_transactions() to fail");
554 TEST_ASSERT(rd_kafka_error_code(error) ==
555 RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
556 "Expected error ERR_INVALID_TRANSACTION_TIMEOUT, "
557 "not %s: %s",
558 rd_kafka_error_name(error),
559 error ? rd_kafka_error_string(error) : "");
560 TEST_ASSERT(rd_kafka_error_is_fatal(error),
561 "Expected error to have is_fatal() set");
562 rd_kafka_error_destroy(error);
563 /* Check that a fatal error is raised */
564 fatal_err = rd_kafka_fatal_error(p, errstr, sizeof(errstr));
565 TEST_ASSERT(fatal_err == RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
566 "Expected fatal error ERR_INVALID_TRANSACTION_TIMEOUT, "
567 "not %s: %s",
568 rd_kafka_err2name(fatal_err),
569 fatal_err ? errstr : "");
570
571 rd_kafka_destroy(p);
572
573
574 /*
575 * Multiple calls to init_transactions(): finish on first.
576 */
577 TEST_SAY("[ Test multiple init_transactions(): finish on first ]\n");
578 test_conf_init(&conf, NULL, 10);
579
580 test_conf_set(conf, "transactional.id", topic);
581
582 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
583
584 TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30*1000));
585
586 error = rd_kafka_init_transactions(p, 1);
587 TEST_ASSERT(error, "Expected init_transactions() to fail");
588 TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE,
589 "Expected ERR__STATE error, not %s",
590 rd_kafka_error_name(error));
591 rd_kafka_error_destroy(error);
592
593 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
594
595 error = rd_kafka_init_transactions(p, 3*1000);
596 TEST_ASSERT(error, "Expected init_transactions() to fail");
597 TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE,
598 "Expected ERR__STATE error, not %s",
599 rd_kafka_error_name(error));
600 rd_kafka_error_destroy(error);
601
602 rd_kafka_destroy(p);
603
604
605 /*
606 * Multiple calls to init_transactions(): timeout on first.
607 */
608 TEST_SAY("[ Test multiple init_transactions(): timeout on first ]\n");
609 test_conf_init(&conf, NULL, 10);
610
611 test_conf_set(conf, "transactional.id", topic);
612
613 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
614
615 error = rd_kafka_init_transactions(p, 1);
616 TEST_ASSERT(error, "Expected init_transactions() to fail");
617 TEST_SAY("error: %s, %d\n", rd_kafka_error_string(error),
618 rd_kafka_error_is_retriable(error));
619 TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
620 "Expected ERR__TIMED_OUT, not %s: %s",
621 rd_kafka_error_name(error),
622 rd_kafka_error_string(error));
623 TEST_ASSERT(rd_kafka_error_is_retriable(error),
624 "Expected error to be retriable");
625 rd_kafka_error_destroy(error);
626
627 TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30*1000));
628
629 rd_kafka_destroy(p);
630
631
632 /*
633 * Multiple calls to init_transactions(): hysterical amounts
634 */
635 TEST_SAY("[ Test multiple init_transactions(): hysterical amounts ]\n");
636 test_conf_init(&conf, NULL, 10);
637
638 test_conf_set(conf, "transactional.id", topic);
639
640 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
641
642 /* Call until init succeeds */
643 for (i = 0 ; i < 5000 ; i++) {
644 if (!(error = rd_kafka_init_transactions(p, 1)))
645 break;
646
647 TEST_ASSERT(rd_kafka_error_is_retriable(error),
648 "Expected error to be retriable");
649 rd_kafka_error_destroy(error);
650
651 error = rd_kafka_begin_transaction(p);
652 TEST_ASSERT(error, "Expected begin_transactions() to fail");
653 TEST_ASSERT(rd_kafka_error_code(error) ==
654 RD_KAFKA_RESP_ERR__STATE,
655 "Expected begin_transactions() to fail "
656 "with STATE, not %s",
657 rd_kafka_error_name(error));
658
659 rd_kafka_error_destroy(error);
660 }
661
662 TEST_SAY("init_transactions() succeeded after %d call(s)\n", i+1);
663
664 /* Make sure a sub-sequent init call fails. */
665 error = rd_kafka_init_transactions(p, 5*1000);
666 TEST_ASSERT(error, "Expected init_transactions() to fail");
667 TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE,
668 "Expected init_transactions() to fail with STATE, not %s",
669 rd_kafka_error_name(error));
670 rd_kafka_error_destroy(error);
671
672 /* But begin.. should work now */
673 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
674
675 rd_kafka_destroy(p);
676
677 SUB_TEST_PASS();
678 }
679
680
681 /**
682 * @brief is_fatal_cb for fenced_txn test.
683 */
fenced_txn_is_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)684 static int fenced_txn_is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
685 const char *reason) {
686 TEST_SAY("is_fatal?: %s: %s\n", rd_kafka_err2str(err), reason);
687 if (err == RD_KAFKA_RESP_ERR__FENCED) {
688 TEST_SAY("Saw the expected fatal error\n");
689 return 0;
690 }
691 return 1;
692 }
693
694
695 /**
696 * @brief Check that transaction fencing is handled correctly.
697 */
do_test_fenced_txn(rd_bool_t produce_after_fence)698 static void do_test_fenced_txn (rd_bool_t produce_after_fence) {
699 const char *topic = test_mk_topic_name("0103_fenced_txn", 1);
700 rd_kafka_conf_t *conf;
701 rd_kafka_t *p1, *p2;
702 rd_kafka_error_t *error;
703 uint64_t testid;
704
705 SUB_TEST_QUICK("%sproduce after fence",
706 produce_after_fence ? "" : "do not ");
707
708 if (produce_after_fence)
709 test_curr->is_fatal_cb = fenced_txn_is_fatal_cb;
710
711 test_curr->ignore_dr_err = rd_false;
712
713 testid = test_id_generate();
714
715 test_conf_init(&conf, NULL, 30);
716
717 test_conf_set(conf, "transactional.id", topic);
718 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
719
720 p1 = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf));
721 p2 = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf));
722 rd_kafka_conf_destroy(conf);
723
724 TEST_CALL_ERROR__(rd_kafka_init_transactions(p1, 30*1000));
725
726 /* Begin a transaction */
727 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p1));
728
729 /* Produce some messages */
730 test_produce_msgs2(p1, topic, testid, RD_KAFKA_PARTITION_UA,
731 0, 10, NULL, 0);
732
733 /* Initialize transactions on producer 2, this should
734 * fence off producer 1. */
735 TEST_CALL_ERROR__(rd_kafka_init_transactions(p2, 30*1000));
736
737 if (produce_after_fence) {
738 /* This will fail hard since the epoch was bumped. */
739 TEST_SAY("Producing after producing fencing\n");
740 test_curr->ignore_dr_err = rd_true;
741 test_produce_msgs2(p1, topic, testid, RD_KAFKA_PARTITION_UA,
742 0, 10, NULL, 0);
743 }
744
745
746 error = rd_kafka_commit_transaction(p1, 30*1000);
747
748 TEST_ASSERT(error, "Expected commit to fail");
749 TEST_ASSERT(rd_kafka_fatal_error(p1, NULL, 0),
750 "Expected a fatal error to have been raised");
751 TEST_ASSERT(error, "Expected commit_transaction() to fail");
752 TEST_ASSERT(rd_kafka_error_is_fatal(error),
753 "Expected commit_transaction() to return a "
754 "fatal error");
755 TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error),
756 "Expected commit_transaction() not to return an "
757 "abortable error");
758 TEST_ASSERT(!rd_kafka_error_is_retriable(error),
759 "Expected commit_transaction() not to return a "
760 "retriable error");
761 TEST_ASSERT(rd_kafka_error_code(error) ==
762 RD_KAFKA_RESP_ERR__FENCED,
763 "Expected commit_transaction() to return %s, "
764 "not %s: %s",
765 rd_kafka_err2name(RD_KAFKA_RESP_ERR__FENCED),
766 rd_kafka_error_name(error),
767 rd_kafka_error_string(error));
768 rd_kafka_error_destroy(error);
769
770 rd_kafka_destroy(p1);
771 rd_kafka_destroy(p2);
772
773 /* Make sure no messages were committed. */
774 test_consume_txn_msgs_easy(topic, topic, testid,
775 test_get_partition_count(NULL, topic,
776 10*1000),
777 0, NULL);
778
779 SUB_TEST_PASS();
780 }
781
782
783
784 /**
785 * @brief Check that fatal idempotent producer errors are also fatal
786 * transactional errors when KIP-360 is not supported.
787 */
do_test_fatal_idempo_error_without_kip360(void)788 static void do_test_fatal_idempo_error_without_kip360 (void) {
789 const char *topic = test_mk_topic_name("0103_fatal_idempo", 1);
790 const int32_t partition = 0;
791 rd_kafka_conf_t *conf, *c_conf;
792 rd_kafka_t *p, *c;
793 rd_kafka_error_t *error;
794 uint64_t testid;
795 const int msgcnt[3] = { 6, 4, 1 };
796 rd_kafka_topic_partition_list_t *records;
797 test_msgver_t expect_mv, actual_mv;
798 /* This test triggers UNKNOWN_PRODUCER_ID on AK <2.4 and >2.4, but
799 * not on AK 2.4.
800 * On AK <2.5 (pre KIP-360) these errors are unrecoverable,
801 * on AK >2.5 (with KIP-360) we can recover.
802 * Since 2.4 is not behaving as the other releases we skip it here. */
803 rd_bool_t expect_fail = test_broker_version < TEST_BRKVER(2,5,0,0);
804
805 SUB_TEST_QUICK("%s",
806 expect_fail ?
807 "expecting failure since broker is < 2.5" :
808 "not expecting failure since broker is >= 2.5");
809
810 if (test_broker_version >= TEST_BRKVER(2,4,0,0) &&
811 test_broker_version < TEST_BRKVER(2,5,0,0))
812 SUB_TEST_SKIP("can't trigger UNKNOWN_PRODUCER_ID on AK 2.4");
813
814 if (expect_fail)
815 test_curr->is_fatal_cb = test_error_is_not_fatal_cb;
816 test_curr->ignore_dr_err = expect_fail;
817
818 testid = test_id_generate();
819
820 /* Keep track of what messages to expect on the output topic */
821 test_msgver_init(&expect_mv, testid);
822
823 test_conf_init(&conf, NULL, 30);
824
825 test_conf_set(conf, "transactional.id", topic);
826 test_conf_set(conf, "batch.num.messages", "1");
827 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
828
829 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
830
831 test_create_topic(p, topic, 1, 3);
832
833
834 TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30*1000));
835
836 /*
837 * 3 transactions:
838 * 1. Produce some messages, commit.
839 * 2. Produce some messages, then delete the messages from txn 1 and
840 * then produce some more messages: UNKNOWN_PRODUCER_ID should be
841 * raised as a fatal error.
842 * 3. Start a new transaction, produce and commit some new messages.
843 * (this step is only performed when expect_fail is false).
844 */
845
846 /*
847 * Transaction 1
848 */
849 TEST_SAY(_C_BLU "Transaction 1: %d msgs\n", msgcnt[0]);
850 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
851 test_produce_msgs2(p, topic, testid, partition, 0,
852 msgcnt[0], NULL, 0);
853 TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));
854
855
856 /*
857 * Transaction 2
858 */
859 TEST_SAY(_C_BLU "Transaction 2: %d msgs\n", msgcnt[1]);
860 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
861
862 /* Now delete the messages from txn1 */
863 TEST_SAY("Deleting records < %s [%"PRId32"] offset %d+1\n",
864 topic, partition, msgcnt[0]);
865 records = rd_kafka_topic_partition_list_new(1);
866 rd_kafka_topic_partition_list_add(records, topic, partition)->offset =
867 msgcnt[0]; /* include the control message too */
868
869 TEST_CALL_ERR__(test_DeleteRecords_simple(p,
870 NULL,
871 records,
872 NULL));
873 rd_kafka_topic_partition_list_destroy(records);
874
875 /* Wait for deletes to propagate */
876 rd_sleep(2);
877
878 if (!expect_fail)
879 test_curr->dr_mv = &expect_mv;
880
881 /* Produce more messages, should now fail */
882 test_produce_msgs2(p, topic, testid, partition, 0,
883 msgcnt[1], NULL, 0);
884
885 error = rd_kafka_commit_transaction(p, -1);
886
887 TEST_SAY_ERROR(error, "commit_transaction() returned: ");
888
889 if (expect_fail) {
890 TEST_ASSERT(error != NULL,
891 "Expected transaction to fail");
892 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
893 "Expected abortable error");
894 rd_kafka_error_destroy(error);
895
896 /* Now abort transaction, which should raise the fatal error
897 * since it is the abort that performs the PID reinitialization.
898 */
899 error = rd_kafka_abort_transaction(p, -1);
900 TEST_SAY_ERROR(error, "abort_transaction() returned: ");
901 TEST_ASSERT(error != NULL,
902 "Expected abort to fail");
903 TEST_ASSERT(rd_kafka_error_is_fatal(error),
904 "Expecting fatal error");
905 TEST_ASSERT(!rd_kafka_error_is_retriable(error),
906 "Did not expect retriable error");
907 TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error),
908 "Did not expect abortable error");
909
910 rd_kafka_error_destroy(error);
911
912 } else {
913 TEST_ASSERT(!error, "Did not expect commit to fail: %s",
914 rd_kafka_error_string(error));
915 }
916
917
918 if (!expect_fail) {
919 /*
920 * Transaction 3
921 */
922 TEST_SAY(_C_BLU "Transaction 3: %d msgs\n", msgcnt[2]);
923 test_curr->dr_mv = &expect_mv;
924 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
925 test_produce_msgs2(p, topic, testid, partition, 0,
926 msgcnt[2], NULL, 0);
927 TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));
928 }
929
930 rd_kafka_destroy(p);
931
932 /* Consume messages.
933 * On AK<2.5 (expect_fail=true) we do not expect to see any messages
934 * since the producer will have failed with a fatal error.
935 * On AK>=2.5 (expect_fail=false) we should only see messages from
936 * txn 3 which are sent after the producer has recovered.
937 */
938
939 test_conf_init(&c_conf, NULL, 0);
940 test_conf_set(c_conf, "enable.partition.eof", "true");
941 c = test_create_consumer(topic, NULL, c_conf, NULL);
942 test_consumer_assign_partition("consume",
943 c, topic, partition,
944 RD_KAFKA_OFFSET_BEGINNING);
945
946 test_msgver_init(&actual_mv, testid);
947 test_msgver_ignore_eof(&actual_mv);
948
949 test_consumer_poll("Verify output topic", c, testid,
950 1, 0, -1, &actual_mv);
951
952 test_msgver_verify_compare("Verify output topic",
953 &actual_mv, &expect_mv,
954 TEST_MSGVER_ALL);
955
956 test_msgver_clear(&actual_mv);
957 test_msgver_clear(&expect_mv);
958
959 rd_kafka_destroy(c);
960
961 SUB_TEST_PASS();
962 }
963
964
965 /**
966 * @brief Check that empty transactions, with no messages produced, work
967 * as expected.
968 */
do_test_empty_txn(rd_bool_t send_offsets,rd_bool_t do_commit)969 static void do_test_empty_txn (rd_bool_t send_offsets, rd_bool_t do_commit) {
970 const char *topic = test_mk_topic_name("0103_empty_txn", 1);
971 rd_kafka_conf_t *conf, *c_conf;
972 rd_kafka_t *p, *c;
973 uint64_t testid;
974 const int msgcnt = 10;
975 rd_kafka_topic_partition_list_t *committed;
976 int64_t offset;
977
978 SUB_TEST_QUICK("%ssend offsets, %s",
979 send_offsets ? "" : "don't ",
980 do_commit ? "commit" : "abort");
981
982 testid = test_id_generate();
983
984 test_conf_init(&conf, NULL, 30);
985 c_conf = rd_kafka_conf_dup(conf);
986
987 test_conf_set(conf, "transactional.id", topic);
988 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
989 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
990
991 test_create_topic(p, topic, 1, 3);
992
993 /* Produce some non-txnn messages for the consumer to read and commit */
994 test_produce_msgs_easy(topic, testid, 0, msgcnt);
995
996 /* Create consumer and subscribe to the topic */
997 test_conf_set(c_conf, "auto.offset.reset", "earliest");
998 test_conf_set(c_conf, "enable.auto.commit", "false");
999 c = test_create_consumer(topic, NULL, c_conf, NULL);
1000 test_consumer_subscribe(c, topic);
1001 test_consumer_wait_assignment(c, rd_false);
1002
1003 TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1));
1004
1005 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
1006
1007 /* send_offsets? Consume messages and send those offsets to the txn */
1008 if (send_offsets) {
1009 rd_kafka_topic_partition_list_t *offsets;
1010 rd_kafka_consumer_group_metadata_t *cgmetadata;
1011
1012 test_consumer_poll("consume", c, testid, -1, 0, msgcnt, NULL);
1013
1014 TEST_CALL_ERR__(rd_kafka_assignment(c, &offsets));
1015 TEST_CALL_ERR__(rd_kafka_position(c, offsets));
1016
1017 cgmetadata = rd_kafka_consumer_group_metadata(c);
1018 TEST_ASSERT(cgmetadata != NULL,
1019 "failed to get consumer group metadata");
1020
1021 TEST_CALL_ERROR__(
1022 rd_kafka_send_offsets_to_transaction(
1023 p, offsets, cgmetadata, -1));
1024
1025 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1026
1027 rd_kafka_topic_partition_list_destroy(offsets);
1028 }
1029
1030
1031 if (do_commit)
1032 TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));
1033 else
1034 TEST_CALL_ERROR__(rd_kafka_abort_transaction(p, -1));
1035
1036 /* Get the committed offsets */
1037 TEST_CALL_ERR__(rd_kafka_assignment(c, &committed));
1038 TEST_CALL_ERR__(rd_kafka_committed(c, committed, 10*1000));
1039
1040 TEST_ASSERT(committed->cnt == 1,
1041 "expected one committed offset, not %d",
1042 committed->cnt);
1043 offset = committed->elems[0].offset;
1044 TEST_SAY("Committed offset is %"PRId64"\n", offset);
1045
1046 if (do_commit && send_offsets)
1047 TEST_ASSERT(offset >= msgcnt,
1048 "expected committed offset >= %d, got %"PRId64,
1049 msgcnt, offset);
1050 else
1051 TEST_ASSERT(offset < 0,
1052 "expected no committed offset, got %"PRId64,
1053 offset);
1054
1055 rd_kafka_topic_partition_list_destroy(committed);
1056
1057 rd_kafka_destroy(c);
1058 rd_kafka_destroy(p);
1059
1060 SUB_TEST_PASS();
1061 }
1062
1063 /**
1064 * @returns the high watermark for the given partition.
1065 */
query_hi_wmark0(int line,rd_kafka_t * c,const char * topic,int32_t partition)1066 int64_t query_hi_wmark0 (int line,
1067 rd_kafka_t *c, const char *topic, int32_t partition) {
1068 rd_kafka_resp_err_t err;
1069 int64_t lo = -1, hi = -1;
1070
1071 err = rd_kafka_query_watermark_offsets(c, topic, partition, &lo, &hi,
1072 tmout_multip(5*1000));
1073 TEST_ASSERT(!err,
1074 "%d: query_watermark_offsets(%s) failed: %s",
1075 line, topic, rd_kafka_err2str(err));
1076
1077 return hi;
1078 }
1079 #define query_hi_wmark(c,topic,part) query_hi_wmark0(__LINE__,c,topic,part)
1080
1081 /**
1082 * @brief Check that isolation.level works as expected for query_watermark..().
1083 */
do_test_wmark_isolation_level(void)1084 static void do_test_wmark_isolation_level (void) {
1085 const char *topic = test_mk_topic_name("0103_wmark_isol", 1);
1086 rd_kafka_conf_t *conf, *c_conf;
1087 rd_kafka_t *p, *c1, *c2;
1088 uint64_t testid;
1089 int64_t hw_uncommitted, hw_committed;
1090
1091 SUB_TEST_QUICK();
1092
1093 testid = test_id_generate();
1094
1095 test_conf_init(&conf, NULL, 30);
1096 c_conf = rd_kafka_conf_dup(conf);
1097
1098 test_conf_set(conf, "transactional.id", topic);
1099 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
1100 p = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf));
1101
1102 test_create_topic(p, topic, 1, 3);
1103
1104 /* Produce some non-txn messages to avoid 0 as the committed hwmark */
1105 test_produce_msgs_easy(topic, testid, 0, 100);
1106
1107 /* Create consumer and subscribe to the topic */
1108 test_conf_set(c_conf, "isolation.level", "read_committed");
1109 c1 = test_create_consumer(topic, NULL, rd_kafka_conf_dup(c_conf), NULL);
1110 test_conf_set(c_conf, "isolation.level", "read_uncommitted");
1111 c2 = test_create_consumer(topic, NULL, c_conf, NULL);
1112
1113 TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1));
1114
1115 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
1116
1117 /* Produce some txn messages */
1118 test_produce_msgs2(p, topic, testid, 0, 0, 100, NULL, 0);
1119
1120 test_flush(p, 10*1000);
1121
1122 hw_committed = query_hi_wmark(c1, topic, 0);
1123 hw_uncommitted = query_hi_wmark(c2, topic, 0);
1124
1125 TEST_SAY("Pre-commit hwmarks: committed %"PRId64
1126 ", uncommitted %"PRId64"\n",
1127 hw_committed, hw_uncommitted);
1128
1129 TEST_ASSERT(hw_committed > 0 && hw_committed < hw_uncommitted,
1130 "Committed hwmark %"PRId64" should be lower than "
1131 "uncommitted hwmark %"PRId64" for %s [0]",
1132 hw_committed, hw_uncommitted, topic);
1133
1134 TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));
1135
1136 /* Re-create the producer and re-init transactions to make
1137 * sure the transaction is fully committed in the cluster. */
1138 rd_kafka_destroy(p);
1139 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
1140 TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1));
1141 rd_kafka_destroy(p);
1142
1143
1144 /* Now query wmarks again */
1145 hw_committed = query_hi_wmark(c1, topic, 0);
1146 hw_uncommitted = query_hi_wmark(c2, topic, 0);
1147
1148 TEST_SAY("Post-commit hwmarks: committed %"PRId64
1149 ", uncommitted %"PRId64"\n",
1150 hw_committed, hw_uncommitted);
1151
1152 TEST_ASSERT(hw_committed == hw_uncommitted,
1153 "Committed hwmark %"PRId64" should be equal to "
1154 "uncommitted hwmark %"PRId64" for %s [0]",
1155 hw_committed, hw_uncommitted, topic);
1156
1157 rd_kafka_destroy(c1);
1158 rd_kafka_destroy(c2);
1159
1160 SUB_TEST_PASS();
1161 }
1162
1163
1164
main_0103_transactions(int argc,char ** argv)1165 int main_0103_transactions (int argc, char **argv) {
1166
1167 do_test_misuse_txn();
1168 do_test_basic_producer_txn(rd_false /* without compression */);
1169 do_test_basic_producer_txn(rd_true /* with compression */);
1170 do_test_consumer_producer_txn();
1171 do_test_fenced_txn(rd_false /* no produce after fencing */);
1172 do_test_fenced_txn(rd_true /* produce after fencing */);
1173 do_test_fatal_idempo_error_without_kip360();
1174 do_test_empty_txn(rd_false/*don't send offsets*/, rd_true/*commit*/);
1175 do_test_empty_txn(rd_false/*don't send offsets*/, rd_false/*abort*/);
1176 do_test_empty_txn(rd_true/*send offsets*/, rd_true/*commit*/);
1177 do_test_empty_txn(rd_true/*send offsets*/, rd_false/*abort*/);
1178 do_test_wmark_isolation_level();
1179 return 0;
1180 }
1181
1182
1183
1184 /**
1185 * @brief Transaction tests that don't require a broker.
1186 */
do_test_txn_local(void)1187 static void do_test_txn_local (void) {
1188 rd_kafka_conf_t *conf;
1189 rd_kafka_t *p;
1190 rd_kafka_error_t *error;
1191 test_timing_t t_init;
1192 int timeout_ms = 7 * 1000;
1193
1194 SUB_TEST_QUICK();
1195
1196 /*
1197 * No transactional.id, init_transactions() should fail.
1198 */
1199 test_conf_init(&conf, NULL, 0);
1200 test_conf_set(conf, "bootstrap.servers", NULL);
1201
1202 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
1203
1204 error = rd_kafka_init_transactions(p, 10);
1205 TEST_ASSERT(error, "Expected init_transactions() to fail");
1206 TEST_ASSERT(rd_kafka_error_code(error) ==
1207 RD_KAFKA_RESP_ERR__NOT_CONFIGURED,
1208 "Expected ERR__NOT_CONFIGURED, not %s",
1209 rd_kafka_error_name(error));
1210 rd_kafka_error_destroy(error);
1211
1212 rd_kafka_destroy(p);
1213
1214
1215 /*
1216 * No brokers, init_transactions() should time out according
1217 * to the timeout.
1218 */
1219 test_conf_init(&conf, NULL, 0);
1220 test_conf_set(conf, "bootstrap.servers", NULL);
1221 test_conf_set(conf, "transactional.id", "test");
1222 p = test_create_handle(RD_KAFKA_PRODUCER, conf);
1223
1224 TEST_SAY("Waiting for init_transactions() timeout %d ms\n",
1225 timeout_ms);
1226
1227 test_timeout_set((timeout_ms + 2000) / 1000);
1228
1229 TIMING_START(&t_init, "init_transactions()");
1230 error = rd_kafka_init_transactions(p, timeout_ms);
1231 TIMING_STOP(&t_init);
1232 TEST_ASSERT(error, "Expected init_transactions() to fail");
1233 TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
1234 "Expected RD_KAFKA_RESP_ERR__TIMED_OUT, "
1235 "not %s: %s",
1236 rd_kafka_error_name(error),
1237 rd_kafka_error_string(error));
1238
1239 TEST_SAY("init_transactions() failed as expected: %s\n",
1240 rd_kafka_error_string(error));
1241
1242 rd_kafka_error_destroy(error);
1243
1244 TIMING_ASSERT(&t_init, timeout_ms - 2000, timeout_ms + 5000);
1245
1246 rd_kafka_destroy(p);
1247
1248 SUB_TEST_PASS();
1249 }
1250
1251
main_0103_transactions_local(int argc,char ** argv)1252 int main_0103_transactions_local (int argc, char **argv) {
1253
1254 do_test_txn_local();
1255
1256 return 0;
1257 }
1258