1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2019, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 #include "rdkafka.h"
32
33 #include "../src/rdkafka_proto.h"
34 #include "../src/rdstring.h"
35 #include "../src/rdunittest.h"
36
37 #include <stdarg.h>
38
39
40 /**
41 * @name Producer transaction tests using the mock cluster
42 *
43 */
44
45
46 static int allowed_error;
47
48 /**
49 * @brief Decide what error_cb's will cause the test to fail.
50 */
error_is_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)51 static int error_is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
52 const char *reason) {
53 if (err == allowed_error ||
54 /* If transport errors are allowed then it is likely
55 * that we'll also see ALL_BROKERS_DOWN. */
56 (allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT &&
57 err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) {
58 TEST_SAY("Ignoring allowed error: %s: %s\n",
59 rd_kafka_err2name(err), reason);
60 return 0;
61 }
62 return 1;
63 }
64
65
66 static rd_kafka_resp_err_t (*on_response_received_cb) (rd_kafka_t *rk,
67 int sockfd,
68 const char *brokername,
69 int32_t brokerid,
70 int16_t ApiKey,
71 int16_t ApiVersion,
72 int32_t CorrId,
73 size_t size,
74 int64_t rtt,
75 rd_kafka_resp_err_t err,
76 void *ic_opaque);
77
78 /**
79 * @brief Simple on_response_received interceptor that simply calls the
80 * sub-test's on_response_received_cb function, if set.
81 */
82 static rd_kafka_resp_err_t
on_response_received_trampoline(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)83 on_response_received_trampoline (rd_kafka_t *rk,
84 int sockfd,
85 const char *brokername,
86 int32_t brokerid,
87 int16_t ApiKey,
88 int16_t ApiVersion,
89 int32_t CorrId,
90 size_t size,
91 int64_t rtt,
92 rd_kafka_resp_err_t err,
93 void *ic_opaque) {
94 TEST_ASSERT(on_response_received_cb != NULL, "");
95 return on_response_received_cb(rk, sockfd, brokername, brokerid,
96 ApiKey, ApiVersion,
97 CorrId, size, rtt, err, ic_opaque);
98 }
99
100
101 /**
102 * @brief on_new interceptor to add an on_response_received interceptor.
103 */
on_new_producer(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)104 static rd_kafka_resp_err_t on_new_producer (rd_kafka_t *rk,
105 const rd_kafka_conf_t *conf,
106 void *ic_opaque,
107 char *errstr, size_t errstr_size) {
108 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
109
110 if (on_response_received_cb)
111 err = rd_kafka_interceptor_add_on_response_received(
112 rk, "on_response_received",
113 on_response_received_trampoline, ic_opaque);
114
115 return err;
116 }
117
118
119 /**
120 * @brief Create a transactional producer and a mock cluster.
121 *
122 * The var-arg list is a NULL-terminated list of
123 * (const char *key, const char *value) config properties.
124 *
125 * Special keys:
126 * "on_response_received", "" - enable the on_response_received_cb
127 * interceptor,
128 * which must be assigned prior to
129 * calling create_tnx_producer().
130 */
create_txn_producer(rd_kafka_mock_cluster_t ** mclusterp,const char * transactional_id,int broker_cnt,...)131 static rd_kafka_t *create_txn_producer (rd_kafka_mock_cluster_t **mclusterp,
132 const char *transactional_id,
133 int broker_cnt, ...) {
134 rd_kafka_conf_t *conf;
135 rd_kafka_t *rk;
136 char numstr[8];
137 va_list ap;
138 const char *key;
139 rd_bool_t add_interceptors = rd_false;
140
141 rd_snprintf(numstr, sizeof(numstr), "%d", broker_cnt);
142
143 test_conf_init(&conf, NULL, 60);
144
145 test_conf_set(conf, "transactional.id", transactional_id);
146 /* Speed up reconnects */
147 test_conf_set(conf, "reconnect.backoff.max.ms", "2000");
148 test_conf_set(conf, "test.mock.num.brokers", numstr);
149 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
150
151 test_curr->ignore_dr_err = rd_false;
152
153 va_start(ap, broker_cnt);
154 while ((key = va_arg(ap, const char *))) {
155 if (!strcmp(key, "on_response_received")) {
156 add_interceptors = rd_true;
157 (void)va_arg(ap, const char *);
158 } else {
159 test_conf_set(conf, key, va_arg(ap, const char *));
160 }
161 }
162 va_end(ap);
163
164 /* Add an on_.. interceptors */
165 if (add_interceptors)
166 rd_kafka_conf_interceptor_add_on_new(
167 conf,
168 "on_new_producer",
169 on_new_producer, NULL);
170
171 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
172
173 if (mclusterp) {
174 *mclusterp = rd_kafka_handle_mock_cluster(rk);
175 TEST_ASSERT(*mclusterp, "failed to create mock cluster");
176 }
177
178 return rk;
179 }
180
181
182 /**
183 * @brief Test recoverable errors using mock broker error injections
184 * and code coverage checks.
185 */
do_test_txn_recoverable_errors(void)186 static void do_test_txn_recoverable_errors (void) {
187 rd_kafka_t *rk;
188 rd_kafka_mock_cluster_t *mcluster;
189 rd_kafka_topic_partition_list_t *offsets;
190 rd_kafka_consumer_group_metadata_t *cgmetadata;
191 const char *groupid = "myGroupId";
192 const char *txnid = "myTxnId";
193
194 SUB_TEST_QUICK();
195
196 rk = create_txn_producer(&mcluster, txnid, 3,
197 "batch.num.messages", "1",
198 NULL);
199
200 /* Make sure transaction and group coordinators are different.
201 * This verifies that AddOffsetsToTxnRequest isn't sent to the
202 * transaction coordinator but the group coordinator. */
203 rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
204 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 2);
205
206 /*
207 * Inject som InitProducerId errors that causes retries
208 */
209 rd_kafka_mock_push_request_errors(
210 mcluster,
211 RD_KAFKAP_InitProducerId,
212 3,
213 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
214 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
215 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
216
217 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
218
219 (void)RD_UT_COVERAGE_CHECK(0); /* idemp_request_pid_failed(retry) */
220 (void)RD_UT_COVERAGE_CHECK(1); /* txn_idemp_state_change(READY) */
221
222 /*
223 * Start a transaction
224 */
225 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
226
227
228 /* Produce a message without error first */
229 TEST_CALL_ERR__(rd_kafka_producev(rk,
230 RD_KAFKA_V_TOPIC("mytopic"),
231 RD_KAFKA_V_PARTITION(0),
232 RD_KAFKA_V_VALUE("hi", 2),
233 RD_KAFKA_V_END));
234
235 /*
236 * Produce a message, let it fail with a non-idempo/non-txn
237 * retryable error
238 */
239 rd_kafka_mock_push_request_errors(
240 mcluster,
241 RD_KAFKAP_Produce,
242 1,
243 RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS);
244
245 TEST_CALL_ERR__(rd_kafka_producev(rk,
246 RD_KAFKA_V_TOPIC("mytopic"),
247 RD_KAFKA_V_PARTITION(0),
248 RD_KAFKA_V_VALUE("hi", 2),
249 RD_KAFKA_V_END));
250
251 /* Make sure messages are produced */
252 rd_kafka_flush(rk, -1);
253
254 /*
255 * Send some arbitrary offsets, first with some failures, then
256 * succeed.
257 */
258 offsets = rd_kafka_topic_partition_list_new(4);
259 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
260 rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
261 999999111;
262 rd_kafka_topic_partition_list_add(offsets, "srctopic", 0)->offset = 999;
263 rd_kafka_topic_partition_list_add(offsets, "srctop2", 3499)->offset =
264 123456789;
265
266 rd_kafka_mock_push_request_errors(
267 mcluster,
268 RD_KAFKAP_AddPartitionsToTxn,
269 1,
270 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
271
272 rd_kafka_mock_push_request_errors(
273 mcluster,
274 RD_KAFKAP_TxnOffsetCommit,
275 2,
276 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
277 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
278
279 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
280
281 TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
282 rk, offsets,
283 cgmetadata, -1));
284
285 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
286 rd_kafka_topic_partition_list_destroy(offsets);
287
288 /*
289 * Commit transaction, first with som failures, then succeed.
290 */
291 rd_kafka_mock_push_request_errors(
292 mcluster,
293 RD_KAFKAP_EndTxn,
294 3,
295 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
296 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
297 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
298
299 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
300
301 /* All done */
302
303 rd_kafka_destroy(rk);
304
305 SUB_TEST_PASS();
306 }
307
308
309 /**
310 * @brief KIP-360: Test that fatal idempotence errors triggers abortable
311 * transaction errors and that the producer can recover.
312 */
do_test_txn_fatal_idempo_errors(void)313 static void do_test_txn_fatal_idempo_errors (void) {
314 rd_kafka_t *rk;
315 rd_kafka_mock_cluster_t *mcluster;
316 rd_kafka_error_t *error;
317 const char *txnid = "myTxnId";
318
319 SUB_TEST_QUICK();
320
321 rk = create_txn_producer(&mcluster, txnid, 3,
322 "batch.num.messages", "1",
323 NULL);
324
325 test_curr->ignore_dr_err = rd_true;
326 test_curr->is_fatal_cb = error_is_fatal_cb;
327 allowed_error = RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID;
328
329 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
330
331 /*
332 * Start a transaction
333 */
334 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
335
336
337 /* Produce a message without error first */
338 TEST_CALL_ERR__(rd_kafka_producev(rk,
339 RD_KAFKA_V_TOPIC("mytopic"),
340 RD_KAFKA_V_PARTITION(0),
341 RD_KAFKA_V_VALUE("hi", 2),
342 RD_KAFKA_V_END));
343
344 /* Produce a message, let it fail with a fatal idempo error. */
345 rd_kafka_mock_push_request_errors(
346 mcluster,
347 RD_KAFKAP_Produce,
348 1,
349 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
350
351 TEST_CALL_ERR__(rd_kafka_producev(rk,
352 RD_KAFKA_V_TOPIC("mytopic"),
353 RD_KAFKA_V_PARTITION(0),
354 RD_KAFKA_V_VALUE("hi", 2),
355 RD_KAFKA_V_END));
356
357 /* Commit the transaction, should fail */
358 error = rd_kafka_commit_transaction(rk, -1);
359 TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
360
361 TEST_SAY("commit_transaction() failed (expectedly): %s\n",
362 rd_kafka_error_string(error));
363
364 TEST_ASSERT(!rd_kafka_error_is_fatal(error),
365 "Did not expect fatal error");
366 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
367 "Expected abortable error");
368 rd_kafka_error_destroy(error);
369
370 /* Abort the transaction */
371 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
372
373 /* Run a new transaction without errors to verify that the
374 * producer can recover. */
375 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
376
377 TEST_CALL_ERR__(rd_kafka_producev(rk,
378 RD_KAFKA_V_TOPIC("mytopic"),
379 RD_KAFKA_V_PARTITION(0),
380 RD_KAFKA_V_VALUE("hi", 2),
381 RD_KAFKA_V_END));
382
383 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
384
385 /* All done */
386
387 rd_kafka_destroy(rk);
388
389 allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
390
391 SUB_TEST_PASS();
392 }
393
394
395 /**
396 * @brief KIP-360: Test that fatal idempotence errors triggers abortable
397 * transaction errors, but let the broker-side bumping of the
398 * producer PID take longer than the remaining transaction timeout
399 * which should raise a retriable error from abort_transaction().
400 *
401 * @param with_sleep After the first abort sleep longer than it takes to
402 * re-init the pid so that the internal state automatically
403 * transitions.
404 */
do_test_txn_slow_reinit(rd_bool_t with_sleep)405 static void do_test_txn_slow_reinit (rd_bool_t with_sleep) {
406 rd_kafka_t *rk;
407 rd_kafka_mock_cluster_t *mcluster;
408 rd_kafka_error_t *error;
409 int32_t txn_coord = 2;
410 const char *txnid = "myTxnId";
411 test_timing_t timing;
412
413 SUB_TEST("%s sleep", with_sleep ? "with": "without");
414
415 rk = create_txn_producer(&mcluster, txnid, 3,
416 "batch.num.messages", "1",
417 NULL);
418
419 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
420 txn_coord);
421
422 test_curr->ignore_dr_err = rd_true;
423 test_curr->is_fatal_cb = NULL;
424
425 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
426
427 /*
428 * Start a transaction
429 */
430 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
431
432
433 /* Produce a message without error first */
434 TEST_CALL_ERR__(rd_kafka_producev(rk,
435 RD_KAFKA_V_TOPIC("mytopic"),
436 RD_KAFKA_V_PARTITION(0),
437 RD_KAFKA_V_VALUE("hi", 2),
438 RD_KAFKA_V_END));
439
440 test_flush(rk, -1);
441
442 /* Set transaction coordinator latency higher than
443 * the abort_transaction() call timeout so that the automatic
444 * re-initpid takes longer than abort_transaction(). */
445 rd_kafka_mock_broker_push_request_error_rtts(
446 mcluster,
447 txn_coord,
448 RD_KAFKAP_InitProducerId,
449 1,
450 RD_KAFKA_RESP_ERR_NO_ERROR, 10000/*10s*/);
451
452 /* Produce a message, let it fail with a fatal idempo error. */
453 rd_kafka_mock_push_request_errors(
454 mcluster,
455 RD_KAFKAP_Produce,
456 1,
457 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
458
459 TEST_CALL_ERR__(rd_kafka_producev(rk,
460 RD_KAFKA_V_TOPIC("mytopic"),
461 RD_KAFKA_V_PARTITION(0),
462 RD_KAFKA_V_VALUE("hi", 2),
463 RD_KAFKA_V_END));
464
465
466 /* Commit the transaction, should fail */
467 TIMING_START(&timing, "commit_transaction(-1)");
468 error = rd_kafka_commit_transaction(rk, -1);
469 TIMING_STOP(&timing);
470 TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
471
472 TEST_SAY("commit_transaction() failed (expectedly): %s\n",
473 rd_kafka_error_string(error));
474
475 TEST_ASSERT(!rd_kafka_error_is_fatal(error),
476 "Did not expect fatal error");
477 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
478 "Expected abortable error");
479 rd_kafka_error_destroy(error);
480
481 /* Abort the transaction, should fail with retriable (timeout) error */
482 TIMING_START(&timing, "abort_transaction(100)");
483 error = rd_kafka_abort_transaction(rk, 100);
484 TIMING_STOP(&timing);
485 TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
486
487 TEST_SAY("First abort_transaction() failed: %s\n",
488 rd_kafka_error_string(error));
489 TEST_ASSERT(!rd_kafka_error_is_fatal(error),
490 "Did not expect fatal error");
491 TEST_ASSERT(rd_kafka_error_is_retriable(error),
492 "Expected retriable error");
493 rd_kafka_error_destroy(error);
494
495 if (with_sleep)
496 rd_sleep(12);
497
498 /* Retry abort, should now finish. */
499 TEST_SAY("Retrying abort\n");
500 TIMING_START(&timing, "abort_transaction(-1)");
501 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
502 TIMING_STOP(&timing);
503
504 /* Run a new transaction without errors to verify that the
505 * producer can recover. */
506 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
507
508 TEST_CALL_ERR__(rd_kafka_producev(rk,
509 RD_KAFKA_V_TOPIC("mytopic"),
510 RD_KAFKA_V_PARTITION(0),
511 RD_KAFKA_V_VALUE("hi", 2),
512 RD_KAFKA_V_END));
513
514 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
515
516 /* All done */
517
518 rd_kafka_destroy(rk);
519
520 allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
521
522 SUB_TEST_PASS();
523 }
524
525
526
527 /**
528 * @brief KIP-360: Test that fatal idempotence errors triggers abortable
529 * transaction errors, but let the broker-side bumping of the
530 * producer PID fail with a fencing error.
531 * Should raise a fatal error.
532 */
do_test_txn_fenced_reinit(void)533 static void do_test_txn_fenced_reinit (void) {
534 rd_kafka_t *rk;
535 rd_kafka_mock_cluster_t *mcluster;
536 rd_kafka_error_t *error;
537 int32_t txn_coord = 2;
538 const char *txnid = "myTxnId";
539 char errstr[512];
540 rd_kafka_resp_err_t fatal_err;
541
542 SUB_TEST_QUICK();
543
544 rk = create_txn_producer(&mcluster, txnid, 3,
545 "batch.num.messages", "1",
546 NULL);
547
548 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
549 txn_coord);
550
551 test_curr->ignore_dr_err = rd_true;
552 test_curr->is_fatal_cb = error_is_fatal_cb;
553 allowed_error = RD_KAFKA_RESP_ERR__FENCED;
554
555 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
556
557 /*
558 * Start a transaction
559 */
560 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
561
562
563 /* Produce a message without error first */
564 TEST_CALL_ERR__(rd_kafka_producev(rk,
565 RD_KAFKA_V_TOPIC("mytopic"),
566 RD_KAFKA_V_PARTITION(0),
567 RD_KAFKA_V_VALUE("hi", 2),
568 RD_KAFKA_V_END));
569
570 test_flush(rk, -1);
571
572 /* Fail the PID reinit */
573 rd_kafka_mock_broker_push_request_error_rtts(
574 mcluster,
575 txn_coord,
576 RD_KAFKAP_InitProducerId,
577 1,
578 RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH, 0);
579
580 /* Produce a message, let it fail with a fatal idempo error. */
581 rd_kafka_mock_push_request_errors(
582 mcluster,
583 RD_KAFKAP_Produce,
584 1,
585 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
586
587 TEST_CALL_ERR__(rd_kafka_producev(rk,
588 RD_KAFKA_V_TOPIC("mytopic"),
589 RD_KAFKA_V_PARTITION(0),
590 RD_KAFKA_V_VALUE("hi", 2),
591 RD_KAFKA_V_END));
592
593 test_flush(rk, -1);
594
595 /* Abort the transaction, should fail with a fatal error */
596 error = rd_kafka_abort_transaction(rk, -1);
597 TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
598
599 TEST_SAY("abort_transaction() failed: %s\n",
600 rd_kafka_error_string(error));
601 TEST_ASSERT(rd_kafka_error_is_fatal(error),
602 "Expected a fatal error");
603 rd_kafka_error_destroy(error);
604
605 fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
606 TEST_ASSERT(fatal_err,
607 "Expected a fatal error to have been raised");
608 TEST_SAY("Fatal error: %s: %s\n",
609 rd_kafka_err2name(fatal_err), errstr);
610
611 /* All done */
612
613 rd_kafka_destroy(rk);
614
615 allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
616
617 SUB_TEST_PASS();
618 }
619
620
621 /**
622 * @brief Test EndTxn errors.
623 */
do_test_txn_endtxn_errors(void)624 static void do_test_txn_endtxn_errors (void) {
625 rd_kafka_t *rk = NULL;
626 rd_kafka_mock_cluster_t *mcluster = NULL;
627 rd_kafka_resp_err_t err;
628 struct {
629 size_t error_cnt;
630 rd_kafka_resp_err_t errors[4];
631 rd_kafka_resp_err_t exp_err;
632 rd_bool_t exp_retriable;
633 rd_bool_t exp_abortable;
634 rd_bool_t exp_fatal;
635 } scenario[] = {
636 /* This list of errors is from the EndTxnResponse handler in
637 * AK clients/.../TransactionManager.java */
638 { /* #0 */
639 2,
640 { RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
641 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE },
642 /* Should auto-recover */
643 RD_KAFKA_RESP_ERR_NO_ERROR,
644 },
645 { /* #1 */
646 2,
647 { RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
648 RD_KAFKA_RESP_ERR_NOT_COORDINATOR },
649 /* Should auto-recover */
650 RD_KAFKA_RESP_ERR_NO_ERROR,
651 },
652 { /* #2 */
653 1,
654 { RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS },
655 /* Should auto-recover */
656 RD_KAFKA_RESP_ERR_NO_ERROR,
657 },
658 { /* #3 */
659 3,
660 { RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
661 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
662 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS },
663 /* Should auto-recover */
664 RD_KAFKA_RESP_ERR_NO_ERROR,
665 },
666 { /* #4 */
667 1,
668 { RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID },
669 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
670 rd_false /* !retriable */,
671 rd_true /* abortable */,
672 rd_false /* !fatal */
673 },
674 { /* #5 */
675 1,
676 { RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING },
677 RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
678 rd_false /* !retriable */,
679 rd_true /* abortable */,
680 rd_false /* !fatal */
681 },
682 { /* #6 */
683 1,
684 { RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH },
685 /* This error is normalized */
686 RD_KAFKA_RESP_ERR__FENCED,
687 rd_false /* !retriable */,
688 rd_false /* !abortable */,
689 rd_true /* fatal */
690 },
691 { /* #7 */
692 1,
693 { RD_KAFKA_RESP_ERR_PRODUCER_FENCED },
694 /* This error is normalized */
695 RD_KAFKA_RESP_ERR__FENCED,
696 rd_false /* !retriable */,
697 rd_false /* !abortable */,
698 rd_true /* fatal */
699 },
700 { /* #8 */
701 1,
702 { RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED },
703 RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
704 rd_false /* !retriable */,
705 rd_false /* !abortable */,
706 rd_true /* fatal */
707 },
708 { /* #9 */
709 1,
710 { RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED },
711 RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
712 rd_false /* !retriable */,
713 rd_true /* abortable */,
714 rd_false /* !fatal */
715 },
716 { /* #10 */
717 /* Any other error should raise a fatal error */
718 1,
719 { RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE },
720 RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
721 rd_false /* !retriable */,
722 rd_true /* abortable */,
723 rd_false /* !fatal */,
724 },
725 { 0 },
726 };
727 int i;
728
729 SUB_TEST_QUICK();
730
731 for (i = 0 ; scenario[i].error_cnt > 0 ; i++) {
732 int j;
733 /* For each scenario, test:
734 * commit_transaction()
735 * flush() + commit_transaction()
736 * abort_transaction()
737 * flush() + abort_transaction()
738 */
739 for (j = 0 ; j < (2+2) ; j++) {
740 rd_bool_t commit = j < 2;
741 rd_bool_t with_flush = j & 1;
742 const char *commit_str =
743 commit ?
744 (with_flush ? "commit&flush" : "commit") :
745 (with_flush ? "abort&flush" : "abort");
746 rd_kafka_topic_partition_list_t *offsets;
747 rd_kafka_consumer_group_metadata_t *cgmetadata;
748 rd_kafka_error_t *error;
749 test_timing_t t_call;
750
751 TEST_SAY("Testing scenario #%d %s with %"PRIusz
752 " injected erorrs, expecting %s\n",
753 i, commit_str,
754 scenario[i].error_cnt,
755 rd_kafka_err2name(scenario[i].exp_err));
756
757 if (!rk) {
758 const char *txnid = "myTxnId";
759 rk = create_txn_producer(&mcluster, txnid,
760 3, NULL);
761 TEST_CALL_ERROR__(rd_kafka_init_transactions(
762 rk, 5000));
763 }
764
765 /*
766 * Start transaction
767 */
768 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
769
770 /* Transaction aborts will cause DR errors:
771 * ignore them. */
772 test_curr->ignore_dr_err = !commit;
773
774 /*
775 * Produce a message.
776 */
777 err = rd_kafka_producev(rk,
778 RD_KAFKA_V_TOPIC("mytopic"),
779 RD_KAFKA_V_VALUE("hi", 2),
780 RD_KAFKA_V_END);
781 TEST_ASSERT(!err, "produce failed: %s",
782 rd_kafka_err2str(err));
783
784 if (with_flush)
785 test_flush(rk, -1);
786
787 /*
788 * Send some arbitrary offsets.
789 */
790 offsets = rd_kafka_topic_partition_list_new(4);
791 rd_kafka_topic_partition_list_add(offsets, "srctopic",
792 3)->offset = 12;
793 rd_kafka_topic_partition_list_add(offsets, "srctop2",
794 99)->offset = 99999;
795
796 cgmetadata = rd_kafka_consumer_group_metadata_new(
797 "mygroupid");
798
799 TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
800 rk, offsets,
801 cgmetadata, -1));
802
803 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
804 rd_kafka_topic_partition_list_destroy(offsets);
805
806 /*
807 * Commit transaction, first with som failures,
808 * then succeed.
809 */
810 rd_kafka_mock_push_request_errors_array(
811 mcluster,
812 RD_KAFKAP_EndTxn,
813 scenario[i].error_cnt,
814 scenario[i].errors);
815
816 TIMING_START(&t_call, "%s", commit_str);
817 if (commit)
818 error = rd_kafka_commit_transaction(
819 rk, tmout_multip(5000));
820 else
821 error = rd_kafka_abort_transaction(
822 rk, tmout_multip(5000));
823 TIMING_STOP(&t_call);
824
825 if (error)
826 TEST_SAY("Scenario #%d %s failed: %s: %s "
827 "(retriable=%s, req_abort=%s, "
828 "fatal=%s)\n",
829 i, commit_str,
830 rd_kafka_error_name(error),
831 rd_kafka_error_string(error),
832 RD_STR_ToF(rd_kafka_error_is_retriable(error)),
833 RD_STR_ToF(rd_kafka_error_txn_requires_abort(error)),
834 RD_STR_ToF(rd_kafka_error_is_fatal(error)));
835 else
836 TEST_SAY("Scenario #%d %s succeeded\n",
837 i, commit_str);
838
839 if (!scenario[i].exp_err) {
840 TEST_ASSERT(!error,
841 "Expected #%d %s to succeed, "
842 "got %s",
843 i, commit_str,
844 rd_kafka_error_string(error));
845 continue;
846 }
847
848
849 TEST_ASSERT(error != NULL,
850 "Expected #%d %s to fail",
851 i, commit_str);
852 TEST_ASSERT(scenario[i].exp_err ==
853 rd_kafka_error_code(error),
854 "Scenario #%d: expected %s, not %s",
855 i,
856 rd_kafka_err2name(scenario[i].exp_err),
857 rd_kafka_error_name(error));
858 TEST_ASSERT(scenario[i].exp_retriable ==
859 (rd_bool_t)
860 rd_kafka_error_is_retriable(error),
861 "Scenario #%d: retriable mismatch",
862 i);
863 TEST_ASSERT(scenario[i].exp_abortable ==
864 (rd_bool_t)
865 rd_kafka_error_txn_requires_abort(error),
866 "Scenario #%d: abortable mismatch",
867 i);
868 TEST_ASSERT(scenario[i].exp_fatal ==
869 (rd_bool_t)rd_kafka_error_is_fatal(error),
870 "Scenario #%d: fatal mismatch", i);
871
872 /* Handle errors according to the error flags */
873 if (rd_kafka_error_is_fatal(error)) {
874 TEST_SAY("Fatal error, destroying producer\n");
875 rd_kafka_error_destroy(error);
876 rd_kafka_destroy(rk);
877 rk = NULL; /* Will be re-created on the next
878 * loop iteration. */
879
880 } else if (rd_kafka_error_txn_requires_abort(error)) {
881 rd_kafka_error_destroy(error);
882 TEST_SAY("Abortable error, "
883 "aborting transaction\n");
884 TEST_CALL_ERROR__(
885 rd_kafka_abort_transaction(rk, -1));
886
887 } else if (rd_kafka_error_is_retriable(error)) {
888 rd_kafka_error_destroy(error);
889 TEST_SAY("Retriable error, retrying %s once\n",
890 commit_str);
891 if (commit)
892 TEST_CALL_ERROR__(
893 rd_kafka_commit_transaction(
894 rk, 5000));
895 else
896 TEST_CALL_ERROR__(
897 rd_kafka_abort_transaction(
898 rk, 5000));
899 } else {
900 TEST_FAIL("Scenario #%d %s: "
901 "Permanent error without enough "
902 "hints to proceed: %s\n",
903 i, commit_str,
904 rd_kafka_error_string(error));
905 }
906 }
907 }
908
909 /* All done */
910 if (rk)
911 rd_kafka_destroy(rk);
912
913 SUB_TEST_PASS();
914 }
915
916
917 /**
918 * @brief Test that the commit/abort works properly with infinite timeout.
919 */
do_test_txn_endtxn_infinite(void)920 static void do_test_txn_endtxn_infinite (void) {
921 rd_kafka_t *rk;
922 rd_kafka_mock_cluster_t *mcluster = NULL;
923 const char *txnid = "myTxnId";
924 int i;
925
926 SUB_TEST_QUICK();
927
928 rk = create_txn_producer(&mcluster, txnid, 3, NULL);
929
930 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
931
932 for (i = 0 ; i < 2 ; i++) {
933 rd_bool_t commit = i == 0;
934 const char *commit_str = commit ? "commit" : "abort";
935 rd_kafka_error_t *error;
936 test_timing_t t_call;
937
938 /* Messages will fail on as the transaction fails,
939 * ignore the DR error */
940 test_curr->ignore_dr_err = rd_true;
941
942 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
943
944 TEST_CALL_ERR__(rd_kafka_producev(rk,
945 RD_KAFKA_V_TOPIC("mytopic"),
946 RD_KAFKA_V_VALUE("hi", 2),
947 RD_KAFKA_V_END));
948
949 /*
950 * Commit/abort transaction, first with som retriable failures,
951 * then success.
952 */
953 rd_kafka_mock_push_request_errors(
954 mcluster,
955 RD_KAFKAP_EndTxn,
956 10,
957 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
958 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
959 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
960 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
961 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
962 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
963 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
964 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
965 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
966 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
967
968 rd_sleep(1);
969
970 TIMING_START(&t_call, "%s_transaction()", commit_str);
971 if (commit)
972 error = rd_kafka_commit_transaction(rk, -1);
973 else
974 error = rd_kafka_abort_transaction(rk, -1);
975 TIMING_STOP(&t_call);
976
977 TEST_SAY("%s returned %s\n",
978 commit_str,
979 error ? rd_kafka_error_string(error) : "success");
980
981 TEST_ASSERT(!error,
982 "Expected %s to succeed, got %s",
983 commit_str, rd_kafka_error_string(error));
984
985 }
986
987 /* All done */
988
989 rd_kafka_destroy(rk);
990
991 SUB_TEST_PASS();
992 }
993
994
995
996 /**
997 * @brief Test that the commit/abort user timeout is honoured.
998 */
do_test_txn_endtxn_timeout(void)999 static void do_test_txn_endtxn_timeout (void) {
1000 rd_kafka_t *rk;
1001 rd_kafka_mock_cluster_t *mcluster = NULL;
1002 const char *txnid = "myTxnId";
1003 int i;
1004
1005 SUB_TEST_QUICK();
1006
1007 rk = create_txn_producer(&mcluster, txnid, 3, NULL);
1008
1009 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1010
1011 for (i = 0 ; i < 2 ; i++) {
1012 rd_bool_t commit = i == 0;
1013 const char *commit_str = commit ? "commit" : "abort";
1014 rd_kafka_error_t *error;
1015 test_timing_t t_call;
1016
1017 /* Messages will fail on as the transaction fails,
1018 * ignore the DR error */
1019 test_curr->ignore_dr_err = rd_true;
1020
1021 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1022
1023 TEST_CALL_ERR__(rd_kafka_producev(rk,
1024 RD_KAFKA_V_TOPIC("mytopic"),
1025 RD_KAFKA_V_VALUE("hi", 2),
1026 RD_KAFKA_V_END));
1027
1028 /*
1029 * Commit/abort transaction, first with som retriable failures
1030 * whos retries exceed the user timeout.
1031 */
1032 rd_kafka_mock_push_request_errors(
1033 mcluster,
1034 RD_KAFKAP_EndTxn,
1035 10,
1036 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
1037 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1038 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1039 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1040 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1041 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1042 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1043 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
1044 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1045 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
1046
1047 rd_sleep(1);
1048
1049 TIMING_START(&t_call, "%s_transaction()", commit_str);
1050 if (commit)
1051 error = rd_kafka_commit_transaction(rk, 100);
1052 else
1053 error = rd_kafka_abort_transaction(rk, 100);
1054 TIMING_STOP(&t_call);
1055
1056 TEST_SAY("%s returned %s\n",
1057 commit_str,
1058 error ? rd_kafka_error_string(error) : "success");
1059
1060 TEST_ASSERT(error != NULL,
1061 "Expected %s to fail", commit_str);
1062
1063 TEST_ASSERT(rd_kafka_error_code(error) ==
1064 RD_KAFKA_RESP_ERR__TIMED_OUT,
1065 "Expected %s to fail with timeout, not %s: %s",
1066 commit_str,
1067 rd_kafka_error_name(error),
1068 rd_kafka_error_string(error));
1069
1070 if (!commit)
1071 TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error),
1072 "abort_transaction() failure should raise "
1073 "a txn_requires_abort error");
1074 else {
1075 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
1076 "commit_transaction() failure should raise "
1077 "a txn_requires_abort error");
1078 TEST_SAY("Aborting transaction as instructed by "
1079 "error flag\n");
1080 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1081 }
1082
1083 rd_kafka_error_destroy(error);
1084
1085 TIMING_ASSERT(&t_call, 99, 199);
1086 }
1087
1088 /* All done */
1089
1090 rd_kafka_destroy(rk);
1091
1092 SUB_TEST_PASS();
1093 }
1094
1095
1096 /**
1097 * @brief Test that EndTxn is properly sent for aborted transactions
1098 * even if AddOffsetsToTxnRequest was retried.
1099 * This is a check for a txn_req_cnt bug.
1100 */
do_test_txn_req_cnt(void)1101 static void do_test_txn_req_cnt (void) {
1102 rd_kafka_t *rk;
1103 rd_kafka_mock_cluster_t *mcluster;
1104 rd_kafka_topic_partition_list_t *offsets;
1105 rd_kafka_consumer_group_metadata_t *cgmetadata;
1106 const char *txnid = "myTxnId";
1107
1108 SUB_TEST_QUICK();
1109
1110 rk = create_txn_producer(&mcluster, txnid, 3, NULL);
1111
1112 /* Messages will fail on abort(), ignore the DR error */
1113 test_curr->ignore_dr_err = rd_true;
1114
1115 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1116
1117 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1118
1119 /*
1120 * Send some arbitrary offsets, first with some failures, then
1121 * succeed.
1122 */
1123 offsets = rd_kafka_topic_partition_list_new(2);
1124 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1125 rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
1126 999999111;
1127
1128 rd_kafka_mock_push_request_errors(
1129 mcluster,
1130 RD_KAFKAP_AddOffsetsToTxn,
1131 2,
1132 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
1133 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
1134
1135 rd_kafka_mock_push_request_errors(
1136 mcluster,
1137 RD_KAFKAP_TxnOffsetCommit,
1138 2,
1139 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
1140 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
1141
1142 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1143
1144 TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
1145 rk, offsets,
1146 cgmetadata, -1));
1147
1148 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1149 rd_kafka_topic_partition_list_destroy(offsets);
1150
1151 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
1152
1153 /* All done */
1154
1155 rd_kafka_destroy(rk);
1156
1157 SUB_TEST_PASS();
1158 }
1159
1160
1161 /**
1162 * @brief Test abortable errors using mock broker error injections
1163 * and code coverage checks.
1164 */
do_test_txn_requires_abort_errors(void)1165 static void do_test_txn_requires_abort_errors (void) {
1166 rd_kafka_t *rk;
1167 rd_kafka_mock_cluster_t *mcluster;
1168 rd_kafka_error_t *error;
1169 rd_kafka_resp_err_t err;
1170 rd_kafka_topic_partition_list_t *offsets;
1171 rd_kafka_consumer_group_metadata_t *cgmetadata;
1172 int r;
1173
1174 SUB_TEST_QUICK();
1175
1176 rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1177
1178 test_curr->ignore_dr_err = rd_true;
1179
1180 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1181
1182 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1183
1184 /*
1185 * 1. Fail on produce
1186 */
1187 TEST_SAY("1. Fail on produce\n");
1188
1189 rd_kafka_mock_push_request_errors(
1190 mcluster,
1191 RD_KAFKAP_Produce,
1192 1,
1193 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
1194
1195 err = rd_kafka_producev(rk,
1196 RD_KAFKA_V_TOPIC("mytopic"),
1197 RD_KAFKA_V_VALUE("hi", 2),
1198 RD_KAFKA_V_END);
1199 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1200
1201 /* Wait for messages to fail */
1202 test_flush(rk, 5000);
1203
1204 /* Any other transactional API should now raise an error */
1205 offsets = rd_kafka_topic_partition_list_new(1);
1206 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1207
1208 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1209
1210 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1211 cgmetadata, -1);
1212
1213 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1214 rd_kafka_topic_partition_list_destroy(offsets);
1215 TEST_ASSERT(error, "expected error");
1216 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
1217 "expected abortable error, not %s",
1218 rd_kafka_error_string(error));
1219 TEST_SAY("Error %s: %s\n",
1220 rd_kafka_error_name(error),
1221 rd_kafka_error_string(error));
1222 rd_kafka_error_destroy(error);
1223
1224 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1225
1226 /*
1227 * 2. Restart transaction and fail on AddPartitionsToTxn
1228 */
1229 TEST_SAY("2. Fail on AddPartitionsToTxn\n");
1230
1231 /* First refresh proper Metadata to clear the topic's auth error,
1232 * otherwise the produce() below will fail immediately. */
1233 r = test_get_partition_count(rk, "mytopic", 5000);
1234 TEST_ASSERT(r > 0, "Expected topic %s to exist", "mytopic");
1235
1236 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1237
1238 rd_kafka_mock_push_request_errors(
1239 mcluster,
1240 RD_KAFKAP_AddPartitionsToTxn,
1241 1,
1242 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
1243
1244 err = rd_kafka_producev(rk,
1245 RD_KAFKA_V_TOPIC("mytopic"),
1246 RD_KAFKA_V_VALUE("hi", 2),
1247 RD_KAFKA_V_END);
1248 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1249
1250 error = rd_kafka_commit_transaction(rk, 5000);
1251 TEST_ASSERT(error, "commit_transaction should have failed");
1252 TEST_SAY("commit_transaction() error %s: %s\n",
1253 rd_kafka_error_name(error),
1254 rd_kafka_error_string(error));
1255 rd_kafka_error_destroy(error);
1256
1257 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1258
1259 /*
1260 * 3. Restart transaction and fail on AddOffsetsToTxn
1261 */
1262 TEST_SAY("3. Fail on AddOffsetsToTxn\n");
1263
1264 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1265
1266 err = rd_kafka_producev(rk,
1267 RD_KAFKA_V_TOPIC("mytopic"),
1268 RD_KAFKA_V_VALUE("hi", 2),
1269 RD_KAFKA_V_END);
1270 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1271
1272 rd_kafka_mock_push_request_errors(
1273 mcluster,
1274 RD_KAFKAP_AddOffsetsToTxn,
1275 1,
1276 RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED);
1277
1278 offsets = rd_kafka_topic_partition_list_new(1);
1279 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1280 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1281
1282 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1283 cgmetadata, -1);
1284 TEST_ASSERT(error, "Expected send_offsets..() to fail");
1285 TEST_ASSERT(rd_kafka_error_code(error) ==
1286 RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
1287 "expected send_offsets_to_transaction() to fail with "
1288 "group auth error: not %s",
1289 rd_kafka_error_name(error));
1290 rd_kafka_error_destroy(error);
1291
1292 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1293 rd_kafka_topic_partition_list_destroy(offsets);
1294
1295
1296 error = rd_kafka_commit_transaction(rk, 5000);
1297 TEST_ASSERT(error, "commit_transaction should have failed");
1298 rd_kafka_error_destroy(error);
1299
1300 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1301
1302 /* All done */
1303
1304 rd_kafka_destroy(rk);
1305
1306 SUB_TEST_PASS();
1307 }
1308
1309
1310 /**
1311 * @brief Test error handling and recover for when broker goes down during
1312 * an ongoing transaction.
1313 */
do_test_txn_broker_down_in_txn(rd_bool_t down_coord)1314 static void do_test_txn_broker_down_in_txn (rd_bool_t down_coord) {
1315 rd_kafka_t *rk;
1316 rd_kafka_mock_cluster_t *mcluster;
1317 int32_t coord_id, leader_id, down_id;
1318 const char *down_what;
1319 rd_kafka_resp_err_t err;
1320 const char *topic = "test";
1321 const char *transactional_id = "txnid";
1322 int msgcnt = 1000;
1323 int remains = 0;
1324
1325 /* Assign coordinator and leader to two different brokers */
1326 coord_id = 1;
1327 leader_id = 2;
1328 if (down_coord) {
1329 down_id = coord_id;
1330 down_what = "coordinator";
1331 } else {
1332 down_id = leader_id;
1333 down_what = "leader";
1334 }
1335
1336 SUB_TEST_QUICK("Test %s down", down_what);
1337
1338 rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
1339
1340 /* Broker down is not a test-failing error */
1341 allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
1342 test_curr->is_fatal_cb = error_is_fatal_cb;
1343
1344 err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
1345 TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
1346
1347 rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1348 coord_id);
1349 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
1350
1351 /* Start transactioning */
1352 TEST_SAY("Starting transaction\n");
1353 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1354
1355 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1356
1357 test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1358 0, msgcnt / 2, NULL, 0, &remains);
1359
1360 TEST_SAY("Bringing down %s %"PRId32"\n", down_what, down_id);
1361 rd_kafka_mock_broker_set_down(mcluster, down_id);
1362
1363 rd_kafka_flush(rk, 3000);
1364
1365 /* Produce remaining messages */
1366 test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1367 msgcnt / 2, msgcnt / 2, NULL, 0, &remains);
1368
1369 rd_sleep(2);
1370
1371 TEST_SAY("Bringing up %s %"PRId32"\n", down_what, down_id);
1372 rd_kafka_mock_broker_set_up(mcluster, down_id);
1373
1374 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
1375
1376 TEST_ASSERT(remains == 0,
1377 "%d message(s) were not produced\n", remains);
1378
1379 rd_kafka_destroy(rk);
1380
1381 test_curr->is_fatal_cb = NULL;
1382
1383 SUB_TEST_PASS();
1384
1385 }
1386
1387
1388
1389 /**
1390 * @brief Advance the coord_id to the next broker.
1391 */
set_next_coord(rd_kafka_mock_cluster_t * mcluster,const char * transactional_id,int broker_cnt,int32_t * coord_idp)1392 static void set_next_coord (rd_kafka_mock_cluster_t *mcluster,
1393 const char *transactional_id, int broker_cnt,
1394 int32_t *coord_idp) {
1395 int32_t new_coord_id;
1396
1397 new_coord_id = 1 + ((*coord_idp) % (broker_cnt));
1398 TEST_SAY("Changing transaction coordinator from %"PRId32
1399 " to %"PRId32"\n", *coord_idp, new_coord_id);
1400 rd_kafka_mock_coordinator_set(mcluster, "transaction",
1401 transactional_id, new_coord_id);
1402
1403 *coord_idp = new_coord_id;
1404 }
1405
1406 /**
1407 * @brief Switch coordinator during a transaction.
1408 *
1409 */
do_test_txn_switch_coordinator(void)1410 static void do_test_txn_switch_coordinator (void) {
1411 rd_kafka_t *rk;
1412 rd_kafka_mock_cluster_t *mcluster;
1413 int32_t coord_id;
1414 const char *topic = "test";
1415 const char *transactional_id = "txnid";
1416 const int broker_cnt = 5;
1417 const int iterations = 20;
1418 int i;
1419
1420 test_timeout_set(iterations * 10);
1421
1422 SUB_TEST("Test switching coordinators");
1423
1424 rk = create_txn_producer(&mcluster, transactional_id, broker_cnt, NULL);
1425
1426 coord_id = 1;
1427 rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1428 coord_id);
1429
1430 /* Start transactioning */
1431 TEST_SAY("Starting transaction\n");
1432 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1433
1434 for (i = 0 ; i < iterations ; i++) {
1435 const int msgcnt = 100;
1436 int remains = 0;
1437
1438 set_next_coord(mcluster, transactional_id,
1439 broker_cnt, &coord_id);
1440
1441 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1442
1443 test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1444 0, msgcnt / 2, NULL, 0);
1445
1446 if (!(i % 3))
1447 set_next_coord(mcluster, transactional_id,
1448 broker_cnt, &coord_id);
1449
1450 /* Produce remaining messages */
1451 test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1452 msgcnt / 2, msgcnt / 2, NULL, 0,
1453 &remains);
1454
1455 if ((i & 1) || !(i % 8))
1456 set_next_coord(mcluster, transactional_id,
1457 broker_cnt, &coord_id);
1458
1459
1460 if (!(i % 5)) {
1461 test_curr->ignore_dr_err = rd_false;
1462 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
1463
1464 } else {
1465 test_curr->ignore_dr_err = rd_true;
1466 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1467 }
1468 }
1469
1470
1471 rd_kafka_destroy(rk);
1472
1473 SUB_TEST_PASS();
1474 }
1475
1476
1477 /**
1478 * @brief Switch coordinator during a transaction when AddOffsetsToTxn
1479 * are sent. #3571.
1480 */
do_test_txn_switch_coordinator_refresh(void)1481 static void do_test_txn_switch_coordinator_refresh (void) {
1482 rd_kafka_t *rk;
1483 rd_kafka_mock_cluster_t *mcluster;
1484 const char *topic = "test";
1485 const char *transactional_id = "txnid";
1486 rd_kafka_topic_partition_list_t *offsets;
1487 rd_kafka_consumer_group_metadata_t *cgmetadata;
1488
1489 SUB_TEST("Test switching coordinators (refresh)");
1490
1491 rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
1492
1493 rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1494 1);
1495
1496 /* Start transactioning */
1497 TEST_SAY("Starting transaction\n");
1498 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1499
1500 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1501
1502 /* Switch the coordinator so that AddOffsetsToTxnRequest
1503 * will respond with NOT_COORDINATOR. */
1504 TEST_SAY("Switching to coordinator 2\n");
1505 rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1506 2);
1507
1508 /*
1509 * Send some arbitrary offsets.
1510 */
1511 offsets = rd_kafka_topic_partition_list_new(4);
1512 rd_kafka_topic_partition_list_add(offsets, "srctopic",
1513 3)->offset = 12;
1514 rd_kafka_topic_partition_list_add(offsets, "srctop2",
1515 99)->offset = 99999;
1516
1517 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1518
1519 TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
1520 rk, offsets,
1521 cgmetadata, 20*1000));
1522
1523 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1524 rd_kafka_topic_partition_list_destroy(offsets);
1525
1526
1527 /* Produce some messages */
1528 test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10, NULL, 0);
1529
1530 /* And commit the transaction */
1531 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
1532
1533 rd_kafka_destroy(rk);
1534
1535 SUB_TEST_PASS();
1536 }
1537
1538
1539 /**
1540 * @brief Test fatal error handling when transactions are not supported
1541 * by the broker.
1542 */
do_test_txns_not_supported(void)1543 static void do_test_txns_not_supported (void) {
1544 rd_kafka_t *rk;
1545 rd_kafka_conf_t *conf;
1546 rd_kafka_mock_cluster_t *mcluster;
1547 rd_kafka_error_t *error;
1548 rd_kafka_resp_err_t err;
1549
1550 SUB_TEST_QUICK();
1551
1552 test_conf_init(&conf, NULL, 10);
1553
1554 test_conf_set(conf, "transactional.id", "myxnid");
1555 test_conf_set(conf, "bootstrap.servers", ",");
1556 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
1557
1558 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
1559
1560 /* Create mock cluster */
1561 mcluster = rd_kafka_mock_cluster_new(rk, 3);
1562
1563 /* Disable InitProducerId */
1564 rd_kafka_mock_set_apiversion(mcluster, 22/*InitProducerId*/, -1, -1);
1565
1566
1567 rd_kafka_brokers_add(rk, rd_kafka_mock_cluster_bootstraps(mcluster));
1568
1569
1570
1571 error = rd_kafka_init_transactions(rk, 5*1000);
1572 TEST_SAY("init_transactions() returned %s: %s\n",
1573 error ? rd_kafka_error_name(error) : "success",
1574 error ? rd_kafka_error_string(error) : "success");
1575
1576 TEST_ASSERT(error, "Expected init_transactions() to fail");
1577 TEST_ASSERT(rd_kafka_error_code(error) ==
1578 RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
1579 "Expected init_transactions() to fail with %s, not %s: %s",
1580 rd_kafka_err2name(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE),
1581 rd_kafka_error_name(error),
1582 rd_kafka_error_string(error));
1583 rd_kafka_error_destroy(error);
1584
1585 err = rd_kafka_producev(rk,
1586 RD_KAFKA_V_TOPIC("test"),
1587 RD_KAFKA_V_KEY("test", 4),
1588 RD_KAFKA_V_END);
1589 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
1590 "Expected producev() to fail with %s, not %s",
1591 rd_kafka_err2name(RD_KAFKA_RESP_ERR__FATAL),
1592 rd_kafka_err2name(err));
1593
1594 rd_kafka_mock_cluster_destroy(mcluster);
1595
1596 rd_kafka_destroy(rk);
1597
1598 SUB_TEST_PASS();
1599 }
1600
1601
1602 /**
1603 * @brief CONCURRENT_TRANSACTION on AddOffsets.. should be retried.
1604 */
do_test_txns_send_offsets_concurrent_is_retried(void)1605 static void do_test_txns_send_offsets_concurrent_is_retried (void) {
1606 rd_kafka_t *rk;
1607 rd_kafka_mock_cluster_t *mcluster;
1608 rd_kafka_resp_err_t err;
1609 rd_kafka_topic_partition_list_t *offsets;
1610 rd_kafka_consumer_group_metadata_t *cgmetadata;
1611
1612 SUB_TEST_QUICK();
1613
1614 rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1615
1616 test_curr->ignore_dr_err = rd_true;
1617
1618 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1619
1620 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1621
1622 err = rd_kafka_producev(rk,
1623 RD_KAFKA_V_TOPIC("mytopic"),
1624 RD_KAFKA_V_VALUE("hi", 2),
1625 RD_KAFKA_V_END);
1626 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1627
1628 /* Wait for messages to be delivered */
1629 test_flush(rk, 5000);
1630
1631
1632 /*
1633 * Have AddOffsetsToTxn fail but eventually succeed due to
1634 * infinite retries.
1635 */
1636 rd_kafka_mock_push_request_errors(
1637 mcluster,
1638 RD_KAFKAP_AddOffsetsToTxn,
1639 1+5,/* first request + some retries */
1640 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1641 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1642 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1643 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1644 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1645 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1646
1647 offsets = rd_kafka_topic_partition_list_new(1);
1648 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1649
1650 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1651
1652 TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(rk, offsets,
1653 cgmetadata, -1));
1654
1655 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1656 rd_kafka_topic_partition_list_destroy(offsets);
1657
1658 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
1659
1660 /* All done */
1661
1662 rd_kafka_destroy(rk);
1663
1664 SUB_TEST_PASS();
1665 }
1666
1667
1668 /**
1669 * @brief Verify that request timeouts don't cause crash (#2913).
1670 */
do_test_txns_no_timeout_crash(void)1671 static void do_test_txns_no_timeout_crash (void) {
1672 rd_kafka_t *rk;
1673 rd_kafka_mock_cluster_t *mcluster;
1674 rd_kafka_error_t *error;
1675 rd_kafka_resp_err_t err;
1676 rd_kafka_topic_partition_list_t *offsets;
1677 rd_kafka_consumer_group_metadata_t *cgmetadata;
1678
1679 SUB_TEST_QUICK();
1680
1681 rk = create_txn_producer(&mcluster, "txnid", 3,
1682 "socket.timeout.ms", "1000",
1683 "transaction.timeout.ms", "5000",
1684 NULL);
1685
1686 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1687
1688 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1689
1690 err = rd_kafka_producev(rk,
1691 RD_KAFKA_V_TOPIC("mytopic"),
1692 RD_KAFKA_V_VALUE("hi", 2),
1693 RD_KAFKA_V_END);
1694 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1695
1696 test_flush(rk, -1);
1697
1698 /* Delay all broker connections */
1699 if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 2000)) ||
1700 (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 2000)) ||
1701 (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 2000)))
1702 TEST_FAIL("Failed to set broker RTT: %s",
1703 rd_kafka_err2str(err));
1704
1705 /* send_offsets..() should now time out */
1706 offsets = rd_kafka_topic_partition_list_new(1);
1707 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1708 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1709
1710 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1711 cgmetadata, -1);
1712 TEST_ASSERT(error, "Expected send_offsets..() to fail");
1713 TEST_SAY("send_offsets..() failed with %serror: %s\n",
1714 rd_kafka_error_is_retriable(error) ? "retriable " : "",
1715 rd_kafka_error_string(error));
1716 TEST_ASSERT(rd_kafka_error_code(error) ==
1717 RD_KAFKA_RESP_ERR__TIMED_OUT,
1718 "expected send_offsets_to_transaction() to fail with "
1719 "timeout, not %s",
1720 rd_kafka_error_name(error));
1721 TEST_ASSERT(rd_kafka_error_is_retriable(error),
1722 "expected send_offsets_to_transaction() to fail with "
1723 "a retriable error");
1724 rd_kafka_error_destroy(error);
1725
1726 /* Reset delay and try again */
1727 if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 0)) ||
1728 (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 0)) ||
1729 (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 0)))
1730 TEST_FAIL("Failed to reset broker RTT: %s",
1731 rd_kafka_err2str(err));
1732
1733 TEST_SAY("Retrying send_offsets..()\n");
1734 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1735 cgmetadata, -1);
1736 TEST_ASSERT(!error, "Expected send_offsets..() to succeed, got: %s",
1737 rd_kafka_error_string(error));
1738
1739 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1740 rd_kafka_topic_partition_list_destroy(offsets);
1741
1742 /* All done */
1743 rd_kafka_destroy(rk);
1744
1745 SUB_TEST_PASS();
1746 }
1747
1748
1749 /**
1750 * @brief Test auth failure handling.
1751 */
do_test_txn_auth_failure(int16_t ApiKey,rd_kafka_resp_err_t ErrorCode)1752 static void do_test_txn_auth_failure (int16_t ApiKey,
1753 rd_kafka_resp_err_t ErrorCode) {
1754 rd_kafka_t *rk;
1755 rd_kafka_mock_cluster_t *mcluster;
1756 rd_kafka_error_t *error;
1757
1758 SUB_TEST_QUICK("ApiKey=%s ErrorCode=%s",
1759 rd_kafka_ApiKey2str(ApiKey),
1760 rd_kafka_err2name(ErrorCode));
1761
1762 rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1763
1764 rd_kafka_mock_push_request_errors(mcluster,
1765 ApiKey,
1766 1,
1767 ErrorCode);
1768
1769 error = rd_kafka_init_transactions(rk, 5000);
1770 TEST_ASSERT(error, "Expected init_transactions() to fail");
1771
1772 TEST_SAY("init_transactions() failed: %s: %s\n",
1773 rd_kafka_err2name(rd_kafka_error_code(error)),
1774 rd_kafka_error_string(error));
1775 TEST_ASSERT(rd_kafka_error_code(error) == ErrorCode,
1776 "Expected error %s, not %s",
1777 rd_kafka_err2name(ErrorCode),
1778 rd_kafka_err2name(rd_kafka_error_code(error)));
1779 TEST_ASSERT(rd_kafka_error_is_fatal(error),
1780 "Expected error to be fatal");
1781 TEST_ASSERT(!rd_kafka_error_is_retriable(error),
1782 "Expected error to not be retriable");
1783 rd_kafka_error_destroy(error);
1784
1785 /* All done */
1786
1787 rd_kafka_destroy(rk);
1788
1789 SUB_TEST_PASS();
1790 }
1791
1792
1793 /**
1794 * @brief Issue #3041: Commit fails due to message flush() taking too long,
1795 * eventually resulting in an unabortable error and failure to
1796 * re-init the transactional producer.
1797 */
do_test_txn_flush_timeout(void)1798 static void do_test_txn_flush_timeout (void) {
1799 rd_kafka_t *rk;
1800 rd_kafka_mock_cluster_t *mcluster;
1801 rd_kafka_topic_partition_list_t *offsets;
1802 rd_kafka_consumer_group_metadata_t *cgmetadata;
1803 rd_kafka_error_t *error;
1804 const char *txnid = "myTxnId";
1805 const char *topic = "myTopic";
1806 const int32_t coord_id = 2;
1807 int msgcounter = 0;
1808 rd_bool_t is_retry = rd_false;
1809
1810 SUB_TEST_QUICK();
1811
1812 rk = create_txn_producer(&mcluster, txnid, 3,
1813 "message.timeout.ms", "10000",
1814 "transaction.timeout.ms", "10000",
1815 /* Speed up coordinator reconnect */
1816 "reconnect.backoff.max.ms", "1000",
1817 NULL);
1818
1819
1820 /* Broker down is not a test-failing error */
1821 test_curr->is_fatal_cb = error_is_fatal_cb;
1822 allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
1823
1824 rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
1825
1826 /* Set coordinator so we can disconnect it later */
1827 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, coord_id);
1828
1829 /*
1830 * Init transactions
1831 */
1832 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1833
1834 retry:
1835 if (!is_retry) {
1836 /* First attempt should fail. */
1837
1838 test_curr->ignore_dr_err = rd_true;
1839 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
1840
1841 /* Assign invalid partition leaders for some partitions so
1842 * that messages will not be delivered. */
1843 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, -1);
1844 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, -1);
1845
1846 } else {
1847 /* The retry should succeed */
1848 test_curr->ignore_dr_err = rd_false;
1849 test_curr->exp_dr_err = is_retry ? RD_KAFKA_RESP_ERR_NO_ERROR :
1850 RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
1851
1852 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
1853 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
1854
1855 }
1856
1857
1858 /*
1859 * Start a transaction
1860 */
1861 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1862
1863 /*
1864 * Produce some messages to specific partitions and random.
1865 */
1866 test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 100, NULL, 10,
1867 &msgcounter);
1868 test_produce_msgs2_nowait(rk, topic, 1, 0, 0, 100, NULL, 10,
1869 &msgcounter);
1870 test_produce_msgs2_nowait(rk, topic, RD_KAFKA_PARTITION_UA,
1871 0, 0, 100, NULL, 10, &msgcounter);
1872
1873
1874 /*
1875 * Send some arbitrary offsets.
1876 */
1877 offsets = rd_kafka_topic_partition_list_new(4);
1878 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1879 rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
1880 999999111;
1881 rd_kafka_topic_partition_list_add(offsets, "srctopic", 0)->offset = 999;
1882 rd_kafka_topic_partition_list_add(offsets, "srctop2", 3499)->offset =
1883 123456789;
1884
1885 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1886
1887 TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
1888 rk, offsets,
1889 cgmetadata, -1));
1890
1891 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1892 rd_kafka_topic_partition_list_destroy(offsets);
1893
1894 rd_sleep(2);
1895
1896 if (!is_retry) {
1897 /* Now disconnect the coordinator. */
1898 TEST_SAY("Disconnecting transaction coordinator %"PRId32"\n",
1899 coord_id);
1900 rd_kafka_mock_broker_set_down(mcluster, coord_id);
1901 }
1902
1903 /*
1904 * Start committing.
1905 */
1906 error = rd_kafka_commit_transaction(rk, -1);
1907
1908 if (!is_retry) {
1909 TEST_ASSERT(error != NULL,
1910 "Expected commit to fail");
1911 TEST_SAY("commit_transaction() failed (expectedly): %s\n",
1912 rd_kafka_error_string(error));
1913 rd_kafka_error_destroy(error);
1914
1915 } else {
1916 TEST_ASSERT(!error,
1917 "Expected commit to succeed, not: %s",
1918 rd_kafka_error_string(error));
1919 }
1920
1921 if (!is_retry) {
1922 /*
1923 * Bring the coordinator back up.
1924 */
1925 rd_kafka_mock_broker_set_up(mcluster, coord_id);
1926 rd_sleep(2);
1927
1928 /*
1929 * Abort, and try again, this time without error.
1930 */
1931 TEST_SAY("Aborting and retrying\n");
1932 is_retry = rd_true;
1933
1934 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 60000));
1935 goto retry;
1936 }
1937
1938 /* All done */
1939
1940 rd_kafka_destroy(rk);
1941
1942 SUB_TEST_PASS();
1943 }
1944
1945
1946 /**
1947 * @brief ESC-4424: rko is reused in response handler after destroy in coord_req
1948 * sender due to bad state.
1949 *
1950 * This is somewhat of a race condition so we need to perform a couple of
1951 * iterations before it hits, usually 2 or 3, so we try at least 15 times.
1952 */
do_test_txn_coord_req_destroy(void)1953 static void do_test_txn_coord_req_destroy (void) {
1954 rd_kafka_t *rk;
1955 rd_kafka_mock_cluster_t *mcluster;
1956 int i;
1957 int errcnt = 0;
1958
1959 SUB_TEST();
1960
1961 rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1962
1963 test_curr->ignore_dr_err = rd_true;
1964
1965 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1966
1967 for (i = 0 ; i < 15 ; i++) {
1968 rd_kafka_error_t *error;
1969 rd_kafka_resp_err_t err;
1970 rd_kafka_topic_partition_list_t *offsets;
1971 rd_kafka_consumer_group_metadata_t *cgmetadata;
1972
1973 test_timeout_set(10);
1974
1975 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1976
1977 /*
1978 * Inject errors to trigger retries
1979 */
1980 rd_kafka_mock_push_request_errors(
1981 mcluster,
1982 RD_KAFKAP_AddPartitionsToTxn,
1983 2,/* first request + number of internal retries */
1984 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1985 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1986
1987 rd_kafka_mock_push_request_errors(
1988 mcluster,
1989 RD_KAFKAP_AddOffsetsToTxn,
1990 1,/* first request + number of internal retries */
1991 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1992
1993 err = rd_kafka_producev(rk,
1994 RD_KAFKA_V_TOPIC("mytopic"),
1995 RD_KAFKA_V_VALUE("hi", 2),
1996 RD_KAFKA_V_END);
1997 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1998
1999 rd_kafka_mock_push_request_errors(
2000 mcluster,
2001 RD_KAFKAP_Produce,
2002 4,
2003 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
2004 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
2005 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
2006 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
2007 /* FIXME: When KIP-360 is supported, add this error:
2008 * RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER */
2009
2010 err = rd_kafka_producev(rk,
2011 RD_KAFKA_V_TOPIC("mytopic"),
2012 RD_KAFKA_V_VALUE("hi", 2),
2013 RD_KAFKA_V_END);
2014 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
2015
2016
2017 /*
2018 * Send offsets to transaction
2019 */
2020
2021 offsets = rd_kafka_topic_partition_list_new(1);
2022 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->
2023 offset = 12;
2024
2025 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
2026
2027 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
2028 cgmetadata, -1);
2029
2030 TEST_SAY("send_offsets_to_transaction() #%d: %s\n",
2031 i, rd_kafka_error_string(error));
2032
2033 /* As we can't control the exact timing and sequence
2034 * of requests this sometimes fails and sometimes succeeds,
2035 * but we run the test enough times to trigger at least
2036 * one failure. */
2037 if (error) {
2038 TEST_SAY("send_offsets_to_transaction() #%d "
2039 "failed (expectedly): %s\n",
2040 i, rd_kafka_error_string(error));
2041 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
2042 "Expected abortable error for #%d", i);
2043 rd_kafka_error_destroy(error);
2044 errcnt++;
2045 }
2046
2047 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
2048 rd_kafka_topic_partition_list_destroy(offsets);
2049
2050 /* Allow time for internal retries */
2051 rd_sleep(2);
2052
2053 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
2054 }
2055
2056 TEST_ASSERT(errcnt > 0,
2057 "Expected at least one send_offets_to_transaction() "
2058 "failure");
2059
2060 /* All done */
2061
2062 rd_kafka_destroy(rk);
2063 }
2064
2065
2066 static rd_atomic32_t multi_find_req_cnt;
2067
2068 static rd_kafka_resp_err_t
multi_find_on_response_received_cb(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)2069 multi_find_on_response_received_cb (rd_kafka_t *rk,
2070 int sockfd,
2071 const char *brokername,
2072 int32_t brokerid,
2073 int16_t ApiKey,
2074 int16_t ApiVersion,
2075 int32_t CorrId,
2076 size_t size,
2077 int64_t rtt,
2078 rd_kafka_resp_err_t err,
2079 void *ic_opaque) {
2080 rd_kafka_mock_cluster_t *mcluster = rd_kafka_handle_mock_cluster(rk);
2081 rd_bool_t done = rd_atomic32_get(&multi_find_req_cnt) > 10000;
2082
2083 if (ApiKey != RD_KAFKAP_AddOffsetsToTxn || done)
2084 return RD_KAFKA_RESP_ERR_NO_ERROR;
2085
2086 TEST_SAY("on_response_received_cb: %s: %s: brokerid %"PRId32
2087 ", ApiKey %hd, CorrId %d, rtt %.2fms, %s: %s\n",
2088 rd_kafka_name(rk), brokername, brokerid, ApiKey, CorrId,
2089 rtt != -1 ? (float)rtt / 1000.0 : 0.0,
2090 done ? "already done" : "not done yet",
2091 rd_kafka_err2name(err));
2092
2093
2094 if (rd_atomic32_add(&multi_find_req_cnt, 1) == 1) {
2095 /* Trigger a broker down/up event, which in turns
2096 * triggers the coord_req_fsm(). */
2097 rd_kafka_mock_broker_set_down(mcluster, 2);
2098 rd_kafka_mock_broker_set_up(mcluster, 2);
2099 return RD_KAFKA_RESP_ERR_NO_ERROR;
2100 }
2101
2102 /* Trigger a broker down/up event, which in turns
2103 * triggers the coord_req_fsm(). */
2104 rd_kafka_mock_broker_set_down(mcluster, 3);
2105 rd_kafka_mock_broker_set_up(mcluster, 3);
2106
2107 /* Clear the downed broker's latency so that it reconnects
2108 * quickly, otherwise the ApiVersionRequest will be delayed and
2109 * this will in turn delay the -> UP transition that we need to
2110 * trigger the coord_reqs. */
2111 rd_kafka_mock_broker_set_rtt(mcluster, 3, 0);
2112
2113 /* Only do this down/up once */
2114 rd_atomic32_add(&multi_find_req_cnt, 10000);
2115
2116 return RD_KAFKA_RESP_ERR_NO_ERROR;
2117 }
2118
2119
2120 /**
2121 * @brief ESC-4444: multiple FindCoordinatorRequests are sent referencing
2122 * the same coord_req_t, but the first one received will destroy
2123 * the coord_req_t object and make the subsequent FindCoordingResponses
2124 * reference a freed object.
2125 *
2126 * What we want to achieve is this sequence:
2127 * 1. AddOffsetsToTxnRequest + Response which..
2128 * 2. Triggers TxnOffsetCommitRequest, but the coordinator is not known, so..
2129 * 3. Triggers a FindCoordinatorRequest
2130 * 4. FindCoordinatorResponse from 3 is received ..
2131 * 5. A TxnOffsetCommitRequest is sent from coord_req_fsm().
2132 * 6. Another broker changing state to Up triggers coord reqs again, which..
2133 * 7. Triggers a second TxnOffsetCommitRequest from coord_req_fsm().
2134 * 7. FindCoordinatorResponse from 5 is received, references the destroyed rko
2135 * and crashes.
2136 */
do_test_txn_coord_req_multi_find(void)2137 static void do_test_txn_coord_req_multi_find (void) {
2138 rd_kafka_t *rk;
2139 rd_kafka_mock_cluster_t *mcluster;
2140 rd_kafka_error_t *error;
2141 rd_kafka_resp_err_t err;
2142 rd_kafka_topic_partition_list_t *offsets;
2143 rd_kafka_consumer_group_metadata_t *cgmetadata;
2144 const char *txnid = "txnid", *groupid = "mygroupid", *topic = "mytopic";
2145 int i;
2146
2147 SUB_TEST();
2148
2149 rd_atomic32_init(&multi_find_req_cnt, 0);
2150
2151 on_response_received_cb = multi_find_on_response_received_cb;
2152 rk = create_txn_producer(&mcluster, txnid, 3,
2153 /* Need connections to all brokers so we
2154 * can trigger coord_req_fsm events
2155 * by toggling connections. */
2156 "enable.sparse.connections", "false",
2157 /* Set up on_response_received interceptor */
2158 "on_response_received", "", NULL);
2159
2160 /* Let broker 1 be both txn and group coordinator
2161 * so that the group coordinator connection is up when it is time
2162 * send the TxnOffsetCommitRequest. */
2163 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
2164 rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
2165
2166 /* Set broker 1, 2, and 3 as leaders for a partition each and
2167 * later produce to both partitions so we know there's a connection
2168 * to all brokers. */
2169 rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
2170 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
2171 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2);
2172 rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 3);
2173
2174 /* Broker down is not a test-failing error */
2175 allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
2176 test_curr->is_fatal_cb = error_is_fatal_cb;
2177
2178 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
2179
2180 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2181
2182 for (i = 0 ; i < 3 ; i++) {
2183 err = rd_kafka_producev(rk,
2184 RD_KAFKA_V_TOPIC(topic),
2185 RD_KAFKA_V_PARTITION(i),
2186 RD_KAFKA_V_VALUE("hi", 2),
2187 RD_KAFKA_V_END);
2188 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
2189 }
2190
2191 test_flush(rk, 5000);
2192
2193 /*
2194 * send_offsets_to_transaction() will query for the group coordinator,
2195 * we need to make those requests slow so that multiple requests are
2196 * sent.
2197 */
2198 for (i = 1 ; i <= 3 ; i++)
2199 rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 4000);
2200
2201 /*
2202 * Send offsets to transaction
2203 */
2204
2205 offsets = rd_kafka_topic_partition_list_new(1);
2206 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->
2207 offset = 12;
2208
2209 cgmetadata = rd_kafka_consumer_group_metadata_new(groupid);
2210
2211 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
2212 cgmetadata, -1);
2213
2214 TEST_SAY("send_offsets_to_transaction() %s\n",
2215 rd_kafka_error_string(error));
2216 TEST_ASSERT(!error, "send_offsets_to_transaction() failed: %s",
2217 rd_kafka_error_string(error));
2218
2219 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
2220 rd_kafka_topic_partition_list_destroy(offsets);
2221
2222 /* Clear delay */
2223 for (i = 1 ; i <= 3 ; i++)
2224 rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 0);
2225
2226 rd_sleep(5);
2227
2228 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
2229
2230 /* All done */
2231
2232 TEST_ASSERT(rd_atomic32_get(&multi_find_req_cnt) > 10000,
2233 "on_request_sent interceptor did not trigger properly");
2234
2235 rd_kafka_destroy(rk);
2236
2237 on_response_received_cb = NULL;
2238
2239 SUB_TEST_PASS();
2240 }
2241
2242
2243 /**
2244 * @brief ESC-4410: adding producer partitions gradually will trigger multiple
2245 * AddPartitionsToTxn requests. Due to a bug the third partition to be
2246 * registered would hang in PEND_TXN state.
2247 *
2248 * Trigger this behaviour by having two outstanding AddPartitionsToTxn requests
2249 * at the same time, followed by a need for a third:
2250 *
2251 * 1. Set coordinator broker rtt high (to give us time to produce).
2252 * 2. Produce to partition 0, will trigger first AddPartitionsToTxn.
2253 * 3. Produce to partition 1, will trigger second AddPartitionsToTxn.
2254 * 4. Wait for second AddPartitionsToTxn response.
2255 * 5. Produce to partition 2, should trigger AddPartitionsToTxn, but bug
2256 * causes it to be stale in pending state.
2257 */
2258
2259 static rd_atomic32_t multi_addparts_resp_cnt;
2260 static rd_kafka_resp_err_t
multi_addparts_response_received_cb(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)2261 multi_addparts_response_received_cb (rd_kafka_t *rk,
2262 int sockfd,
2263 const char *brokername,
2264 int32_t brokerid,
2265 int16_t ApiKey,
2266 int16_t ApiVersion,
2267 int32_t CorrId,
2268 size_t size,
2269 int64_t rtt,
2270 rd_kafka_resp_err_t err,
2271 void *ic_opaque) {
2272
2273 if (ApiKey == RD_KAFKAP_AddPartitionsToTxn) {
2274 TEST_SAY("on_response_received_cb: %s: %s: brokerid %"PRId32
2275 ", ApiKey %hd, CorrId %d, rtt %.2fms, count %"PRId32
2276 ": %s\n",
2277 rd_kafka_name(rk), brokername, brokerid,
2278 ApiKey, CorrId,
2279 rtt != -1 ? (float)rtt / 1000.0 : 0.0,
2280 rd_atomic32_get(&multi_addparts_resp_cnt),
2281 rd_kafka_err2name(err));
2282
2283 rd_atomic32_add(&multi_addparts_resp_cnt, 1);
2284 }
2285
2286 return RD_KAFKA_RESP_ERR_NO_ERROR;
2287 }
2288
2289
do_test_txn_addparts_req_multi(void)2290 static void do_test_txn_addparts_req_multi (void) {
2291 rd_kafka_t *rk;
2292 rd_kafka_mock_cluster_t *mcluster;
2293 const char *txnid = "txnid", *topic = "mytopic";
2294 int32_t txn_coord = 2;
2295
2296 SUB_TEST();
2297
2298 rd_atomic32_init(&multi_addparts_resp_cnt, 0);
2299
2300 on_response_received_cb = multi_addparts_response_received_cb;
2301 rk = create_txn_producer(&mcluster, txnid, 3,
2302 "linger.ms", "0",
2303 "message.timeout.ms", "9000",
2304 /* Set up on_response_received interceptor */
2305 "on_response_received", "", NULL);
2306
2307 /* Let broker 1 be txn coordinator. */
2308 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
2309 txn_coord);
2310
2311 rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
2312
2313 /* Set partition leaders to non-txn-coord broker so they wont
2314 * be affected by rtt delay */
2315 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
2316 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
2317 rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 1);
2318
2319
2320
2321 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
2322
2323 /*
2324 * Run one transaction first to let the client familiarize with
2325 * the topic, this avoids metadata lookups, etc, when the real
2326 * test is run.
2327 */
2328 TEST_SAY("Running seed transaction\n");
2329 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2330 TEST_CALL_ERR__(rd_kafka_producev(rk,
2331 RD_KAFKA_V_TOPIC(topic),
2332 RD_KAFKA_V_VALUE("seed", 4),
2333 RD_KAFKA_V_END));
2334 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
2335
2336
2337 /*
2338 * Now perform test transaction with rtt delays
2339 */
2340 TEST_SAY("Running test transaction\n");
2341
2342 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2343
2344 /* Reset counter */
2345 rd_atomic32_set(&multi_addparts_resp_cnt, 0);
2346
2347 /* Add latency to txn coordinator so we can pace our produce() calls */
2348 rd_kafka_mock_broker_set_rtt(mcluster, txn_coord, 1000);
2349
2350 /* Produce to partition 0 */
2351 TEST_CALL_ERR__(rd_kafka_producev(rk,
2352 RD_KAFKA_V_TOPIC(topic),
2353 RD_KAFKA_V_PARTITION(0),
2354 RD_KAFKA_V_VALUE("hi", 2),
2355 RD_KAFKA_V_END));
2356
2357 rd_usleep(500*1000, NULL);
2358
2359 /* Produce to partition 1 */
2360 TEST_CALL_ERR__(rd_kafka_producev(rk,
2361 RD_KAFKA_V_TOPIC(topic),
2362 RD_KAFKA_V_PARTITION(1),
2363 RD_KAFKA_V_VALUE("hi", 2),
2364 RD_KAFKA_V_END));
2365
2366 TEST_SAY("Waiting for two AddPartitionsToTxnResponse\n");
2367 while (rd_atomic32_get(&multi_addparts_resp_cnt) < 2)
2368 rd_usleep(10*1000, NULL);
2369
2370 TEST_SAY("%"PRId32" AddPartitionsToTxnResponses seen\n",
2371 rd_atomic32_get(&multi_addparts_resp_cnt));
2372
2373 /* Produce to partition 2, this message will hang in
2374 * queue if the bug is not fixed. */
2375 TEST_CALL_ERR__(rd_kafka_producev(rk,
2376 RD_KAFKA_V_TOPIC(topic),
2377 RD_KAFKA_V_PARTITION(2),
2378 RD_KAFKA_V_VALUE("hi", 2),
2379 RD_KAFKA_V_END));
2380
2381 /* Allow some extra time for things to settle before committing
2382 * transaction. */
2383 rd_usleep(1000*1000, NULL);
2384
2385 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 10*1000));
2386
2387 /* All done */
2388 rd_kafka_destroy(rk);
2389
2390 on_response_received_cb = NULL;
2391
2392 SUB_TEST_PASS();
2393 }
2394
2395
2396
2397 /**
2398 * @brief Test handling of OffsetFetchRequest returning UNSTABLE_OFFSET_COMMIT.
2399 *
2400 * There are two things to test;
2401 * - OffsetFetch triggered by committed() (and similar code paths)
2402 * - OffsetFetch triggered by assign()
2403 */
do_test_unstable_offset_commit(void)2404 static void do_test_unstable_offset_commit (void) {
2405 rd_kafka_t *rk, *c;
2406 rd_kafka_conf_t *c_conf;
2407 rd_kafka_mock_cluster_t *mcluster;
2408 rd_kafka_topic_partition_list_t *offsets;
2409 const char *topic = "mytopic";
2410 const int msgcnt = 100;
2411 const int64_t offset_to_commit = msgcnt / 2;
2412 int i;
2413 int remains = 0;
2414
2415 SUB_TEST_QUICK();
2416
2417 rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
2418
2419 test_conf_init(&c_conf, NULL, 0);
2420 test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
2421 test_conf_set(c_conf, "bootstrap.servers",
2422 rd_kafka_mock_cluster_bootstraps(mcluster));
2423 test_conf_set(c_conf, "enable.partition.eof", "true");
2424 test_conf_set(c_conf, "auto.offset.reset", "error");
2425 c = test_create_consumer("mygroup", NULL, c_conf, NULL);
2426
2427 rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
2428
2429 /* Produce some messages to the topic so that the consumer has
2430 * something to read. */
2431 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
2432 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2433 test_produce_msgs2_nowait(rk, topic, 0, 0, 0, msgcnt,
2434 NULL, 0, &remains);
2435 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
2436
2437
2438 /* Commit offset */
2439 offsets = rd_kafka_topic_partition_list_new(1);
2440 rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
2441 offset_to_commit;
2442 TEST_CALL_ERR__(rd_kafka_commit(c, offsets, 0/*sync*/));
2443 rd_kafka_topic_partition_list_destroy(offsets);
2444
2445 /* Retrieve offsets by calling committed().
2446 *
2447 * Have OffsetFetch fail and retry, on the first iteration
2448 * the API timeout is higher than the amount of time the retries will
2449 * take and thus succeed, and on the second iteration the timeout
2450 * will be lower and thus fail. */
2451 for (i = 0 ; i < 2 ; i++) {
2452 rd_kafka_resp_err_t err;
2453 rd_kafka_resp_err_t exp_err = i == 0 ?
2454 RD_KAFKA_RESP_ERR_NO_ERROR :
2455 RD_KAFKA_RESP_ERR__TIMED_OUT;
2456 int timeout_ms = exp_err ? 200 : 5*1000;
2457
2458 rd_kafka_mock_push_request_errors(
2459 mcluster,
2460 RD_KAFKAP_OffsetFetch,
2461 1+5,/* first request + some retries */
2462 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2463 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2464 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2465 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2466 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2467 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
2468
2469 offsets = rd_kafka_topic_partition_list_new(1);
2470 rd_kafka_topic_partition_list_add(offsets, topic, 0);
2471
2472 err = rd_kafka_committed(c, offsets, timeout_ms);
2473
2474 TEST_SAY("#%d: committed() returned %s (expected %s)\n",
2475 i,
2476 rd_kafka_err2name(err),
2477 rd_kafka_err2name(exp_err));
2478
2479 TEST_ASSERT(err == exp_err,
2480 "#%d: Expected committed() to return %s, not %s",
2481 i,
2482 rd_kafka_err2name(exp_err),
2483 rd_kafka_err2name(err));
2484 TEST_ASSERT(offsets->cnt == 1,
2485 "Expected 1 committed offset, not %d",
2486 offsets->cnt);
2487 if (!exp_err)
2488 TEST_ASSERT(offsets->elems[0].offset == offset_to_commit,
2489 "Expected committed offset %"PRId64", "
2490 "not %"PRId64,
2491 offset_to_commit,
2492 offsets->elems[0].offset);
2493 else
2494 TEST_ASSERT(offsets->elems[0].offset < 0,
2495 "Expected no committed offset, "
2496 "not %"PRId64,
2497 offsets->elems[0].offset);
2498
2499 rd_kafka_topic_partition_list_destroy(offsets);
2500 }
2501
2502 TEST_SAY("Phase 2: OffsetFetch lookup through assignment\n");
2503 offsets = rd_kafka_topic_partition_list_new(1);
2504 rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
2505 RD_KAFKA_OFFSET_STORED;
2506
2507 rd_kafka_mock_push_request_errors(
2508 mcluster,
2509 RD_KAFKAP_OffsetFetch,
2510 1+5,/* first request + some retries */
2511 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2512 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2513 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2514 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2515 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2516 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
2517
2518 test_consumer_incremental_assign("assign", c, offsets);
2519 rd_kafka_topic_partition_list_destroy(offsets);
2520
2521 test_consumer_poll_exact("consume", c, 0,
2522 1/*eof*/, 0, msgcnt/2,
2523 rd_true/*exact counts*/, NULL);
2524
2525 /* All done */
2526 rd_kafka_destroy(c);
2527 rd_kafka_destroy(rk);
2528
2529 SUB_TEST_PASS();
2530 }
2531
2532
2533 /**
2534 * @brief If a message times out locally before being attempted to send
2535 * and commit_transaction() is called, the transaction must not succeed.
2536 * https://github.com/confluentinc/confluent-kafka-dotnet/issues/1568
2537 */
do_test_commit_after_msg_timeout(void)2538 static void do_test_commit_after_msg_timeout (void) {
2539 rd_kafka_t *rk;
2540 rd_kafka_mock_cluster_t *mcluster;
2541 int32_t coord_id, leader_id;
2542 rd_kafka_resp_err_t err;
2543 rd_kafka_error_t *error;
2544 const char *topic = "test";
2545 const char *transactional_id = "txnid";
2546 int remains = 0;
2547
2548 SUB_TEST_QUICK();
2549
2550 /* Assign coordinator and leader to two different brokers */
2551 coord_id = 1;
2552 leader_id = 2;
2553
2554 rk = create_txn_producer(&mcluster, transactional_id, 3,
2555 "message.timeout.ms", "5000",
2556 "transaction.timeout.ms", "10000",
2557 NULL);
2558
2559 /* Broker down is not a test-failing error */
2560 allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
2561 test_curr->is_fatal_cb = error_is_fatal_cb;
2562 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
2563
2564 err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
2565 TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str (err));
2566
2567 rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
2568 coord_id);
2569 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
2570
2571 /* Start transactioning */
2572 TEST_SAY("Starting transaction\n");
2573 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
2574
2575 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2576
2577 TEST_SAY("Bringing down %"PRId32"\n", leader_id);
2578 rd_kafka_mock_broker_set_down(mcluster, leader_id);
2579 rd_kafka_mock_broker_set_down(mcluster, coord_id);
2580
2581 test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
2582
2583 error = rd_kafka_commit_transaction(rk, -1);
2584 TEST_ASSERT(error != NULL, "expected commit_transaciton() to fail");
2585 TEST_SAY("commit_transaction() failed (as expected): %s\n",
2586 rd_kafka_error_string(error));
2587 TEST_ASSERT(rd_kafka_error_txn_requires_abort (error),
2588 "Expected txn_requires_abort error");
2589 rd_kafka_error_destroy(error);
2590
2591 /* Bring the brokers up so the abort can complete */
2592 rd_kafka_mock_broker_set_up(mcluster, coord_id);
2593 rd_kafka_mock_broker_set_up(mcluster, leader_id);
2594
2595 TEST_SAY("Aborting transaction\n");
2596 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
2597
2598 TEST_ASSERT(remains == 0,
2599 "%d message(s) were not flushed\n", remains);
2600
2601 TEST_SAY("Attempting second transaction, which should succeed\n");
2602 allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
2603 test_curr->is_fatal_cb = error_is_fatal_cb;
2604 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
2605
2606 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2607 test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
2608
2609 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
2610
2611 TEST_ASSERT(remains == 0,
2612 "%d message(s) were not produced\n", remains);
2613
2614 rd_kafka_destroy(rk);
2615
2616 test_curr->is_fatal_cb = NULL;
2617
2618 SUB_TEST_PASS();
2619 }
2620
2621
2622 /**
2623 * @brief #3575: Verify that OUT_OF_ORDER_SEQ does not trigger an epoch bump
2624 * during an ongoing transaction.
2625 * The transaction should instead enter the abortable state.
2626 */
do_test_out_of_order_seq(void)2627 static void do_test_out_of_order_seq (void) {
2628 rd_kafka_t *rk;
2629 rd_kafka_mock_cluster_t *mcluster;
2630 rd_kafka_error_t *error;
2631 int32_t txn_coord = 1, leader = 2;
2632 const char *txnid = "myTxnId";
2633 test_timing_t timing;
2634 rd_kafka_resp_err_t err;
2635
2636 SUB_TEST_QUICK();
2637
2638 rk = create_txn_producer(&mcluster, txnid, 3,
2639 "batch.num.messages", "1",
2640 NULL);
2641
2642 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
2643 txn_coord);
2644
2645 rd_kafka_mock_partition_set_leader(mcluster, "mytopic", 0, leader);
2646
2647 test_curr->ignore_dr_err = rd_true;
2648 test_curr->is_fatal_cb = NULL;
2649
2650 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
2651
2652 /*
2653 * Start a transaction
2654 */
2655 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2656
2657
2658
2659 /* Produce one seeding message first to get the leader up and running */
2660 TEST_CALL_ERR__(rd_kafka_producev(rk,
2661 RD_KAFKA_V_TOPIC("mytopic"),
2662 RD_KAFKA_V_PARTITION(0),
2663 RD_KAFKA_V_VALUE("hi", 2),
2664 RD_KAFKA_V_END));
2665 test_flush(rk, -1);
2666
2667 /* Let partition leader have a latency of 2 seconds
2668 * so that we can have multiple messages in-flight. */
2669 rd_kafka_mock_broker_set_rtt(mcluster, leader, 2*1000);
2670
2671 /* Produce a message, let it fail with with different errors,
2672 * ending with OUT_OF_ORDER which previously triggered an
2673 * Epoch bump. */
2674 rd_kafka_mock_push_request_errors(
2675 mcluster,
2676 RD_KAFKAP_Produce,
2677 3,
2678 RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
2679 RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
2680 RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER);
2681
2682 /* Produce three messages that will be delayed
2683 * and have errors injected.*/
2684 TEST_CALL_ERR__(rd_kafka_producev(rk,
2685 RD_KAFKA_V_TOPIC("mytopic"),
2686 RD_KAFKA_V_PARTITION(0),
2687 RD_KAFKA_V_VALUE("hi", 2),
2688 RD_KAFKA_V_END));
2689 TEST_CALL_ERR__(rd_kafka_producev(rk,
2690 RD_KAFKA_V_TOPIC("mytopic"),
2691 RD_KAFKA_V_PARTITION(0),
2692 RD_KAFKA_V_VALUE("hi", 2),
2693 RD_KAFKA_V_END));
2694 TEST_CALL_ERR__(rd_kafka_producev(rk,
2695 RD_KAFKA_V_TOPIC("mytopic"),
2696 RD_KAFKA_V_PARTITION(0),
2697 RD_KAFKA_V_VALUE("hi", 2),
2698 RD_KAFKA_V_END));
2699
2700 /* Now sleep a short while so that the messages are processed
2701 * by the broker and errors are returned. */
2702 TEST_SAY("Sleeping..\n");
2703 rd_sleep(5);
2704
2705 rd_kafka_mock_broker_set_rtt(mcluster, leader, 0);
2706
2707 /* Produce a fifth message, should fail with ERR__STATE since
2708 * the transaction should have entered the abortable state. */
2709 err = rd_kafka_producev(rk,
2710 RD_KAFKA_V_TOPIC("mytopic"),
2711 RD_KAFKA_V_PARTITION(0),
2712 RD_KAFKA_V_VALUE("hi", 2),
2713 RD_KAFKA_V_END);
2714 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__STATE,
2715 "Expected produce() to fail with ERR__STATE, not %s",
2716 rd_kafka_err2name(err));
2717 TEST_SAY("produce() failed as expected: %s\n",
2718 rd_kafka_err2str(err));
2719
2720 /* Commit the transaction, should fail with abortable error. */
2721 TIMING_START(&timing, "commit_transaction(-1)");
2722 error = rd_kafka_commit_transaction(rk, -1);
2723 TIMING_STOP(&timing);
2724 TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
2725
2726 TEST_SAY("commit_transaction() failed (expectedly): %s\n",
2727 rd_kafka_error_string(error));
2728
2729 TEST_ASSERT(!rd_kafka_error_is_fatal(error),
2730 "Did not expect fatal error");
2731 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
2732 "Expected abortable error");
2733 rd_kafka_error_destroy(error);
2734
2735 /* Abort the transaction */
2736 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
2737
2738 /* Run a new transaction without errors to verify that the
2739 * producer can recover. */
2740 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2741
2742 TEST_CALL_ERR__(rd_kafka_producev(rk,
2743 RD_KAFKA_V_TOPIC("mytopic"),
2744 RD_KAFKA_V_PARTITION(0),
2745 RD_KAFKA_V_VALUE("hi", 2),
2746 RD_KAFKA_V_END));
2747
2748 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
2749
2750 rd_kafka_destroy(rk);
2751
2752 SUB_TEST_PASS();
2753 }
2754
2755
main_0105_transactions_mock(int argc,char ** argv)2756 int main_0105_transactions_mock (int argc, char **argv) {
2757 if (test_needs_auth()) {
2758 TEST_SKIP("Mock cluster does not support SSL/SASL\n");
2759 return 0;
2760 }
2761
2762 do_test_txn_recoverable_errors();
2763
2764 do_test_txn_fatal_idempo_errors();
2765
2766 do_test_txn_fenced_reinit();
2767
2768 do_test_txn_req_cnt();
2769
2770 do_test_txn_requires_abort_errors();
2771
2772 do_test_txn_slow_reinit(rd_false);
2773 do_test_txn_slow_reinit(rd_true);
2774
2775 /* Just do a subset of tests in quick mode */
2776 if (test_quick)
2777 return 0;
2778
2779 do_test_txn_endtxn_errors();
2780
2781 do_test_txn_endtxn_infinite();
2782
2783 /* Skip tests for non-infinite commit/abort timeouts
2784 * until they're properly handled by the producer. */
2785 if (0)
2786 do_test_txn_endtxn_timeout();
2787
2788 /* Bring down the coordinator */
2789 do_test_txn_broker_down_in_txn(rd_true);
2790
2791 /* Bring down partition leader */
2792 do_test_txn_broker_down_in_txn(rd_false);
2793
2794 do_test_txns_not_supported();
2795
2796 do_test_txns_send_offsets_concurrent_is_retried();
2797
2798 do_test_txn_coord_req_destroy();
2799
2800 do_test_txn_coord_req_multi_find();
2801
2802 do_test_txn_addparts_req_multi();
2803
2804 do_test_txns_no_timeout_crash();
2805
2806 do_test_txn_auth_failure(
2807 RD_KAFKAP_InitProducerId,
2808 RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
2809
2810 do_test_txn_auth_failure(
2811 RD_KAFKAP_FindCoordinator,
2812 RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
2813
2814 do_test_txn_flush_timeout();
2815
2816 do_test_unstable_offset_commit();
2817
2818 do_test_commit_after_msg_timeout();
2819
2820 do_test_txn_switch_coordinator();
2821
2822 do_test_txn_switch_coordinator_refresh();
2823
2824 do_test_out_of_order_seq();
2825
2826 return 0;
2827 }
2828