1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2019, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include "test.h"
30
31 #include "rdkafka.h"
32
33 #include "../src/rdkafka_proto.h"
34 #include "../src/rdstring.h"
35 #include "../src/rdunittest.h"
36
37 #include <stdarg.h>
38
39
40 /**
41 * @name Producer transaction tests using the mock cluster
42 *
43 */
44
45
46 static int allowed_error;
47
48 /**
49 * @brief Decide what error_cb's will cause the test to fail.
50 */
error_is_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)51 static int error_is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
52 const char *reason) {
53 if (err == allowed_error ||
54 /* If transport errors are allowed then it is likely
55 * that we'll also see ALL_BROKERS_DOWN. */
56 (allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT &&
57 err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) {
58 TEST_SAY("Ignoring allowed error: %s: %s\n",
59 rd_kafka_err2name(err), reason);
60 return 0;
61 }
62 return 1;
63 }
64
65
66 static rd_kafka_resp_err_t (*on_response_received_cb) (rd_kafka_t *rk,
67 int sockfd,
68 const char *brokername,
69 int32_t brokerid,
70 int16_t ApiKey,
71 int16_t ApiVersion,
72 int32_t CorrId,
73 size_t size,
74 int64_t rtt,
75 rd_kafka_resp_err_t err,
76 void *ic_opaque);
77
78 /**
79 * @brief Simple on_response_received interceptor that simply calls the
80 * sub-test's on_response_received_cb function, if set.
81 */
82 static rd_kafka_resp_err_t
on_response_received_trampoline(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)83 on_response_received_trampoline (rd_kafka_t *rk,
84 int sockfd,
85 const char *brokername,
86 int32_t brokerid,
87 int16_t ApiKey,
88 int16_t ApiVersion,
89 int32_t CorrId,
90 size_t size,
91 int64_t rtt,
92 rd_kafka_resp_err_t err,
93 void *ic_opaque) {
94 TEST_ASSERT(on_response_received_cb != NULL, "");
95 return on_response_received_cb(rk, sockfd, brokername, brokerid,
96 ApiKey, ApiVersion,
97 CorrId, size, rtt, err, ic_opaque);
98 }
99
100
101 /**
102 * @brief on_new interceptor to add an on_response_received interceptor.
103 */
on_new_producer(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)104 static rd_kafka_resp_err_t on_new_producer (rd_kafka_t *rk,
105 const rd_kafka_conf_t *conf,
106 void *ic_opaque,
107 char *errstr, size_t errstr_size) {
108 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
109
110 if (on_response_received_cb)
111 err = rd_kafka_interceptor_add_on_response_received(
112 rk, "on_response_received",
113 on_response_received_trampoline, ic_opaque);
114
115 return err;
116 }
117
118
119 /**
120 * @brief Create a transactional producer and a mock cluster.
121 *
122 * The var-arg list is a NULL-terminated list of
123 * (const char *key, const char *value) config properties.
124 *
125 * Special keys:
126 * "on_response_received", "" - enable the on_response_received_cb
127 * interceptor,
128 * which must be assigned prior to
129 * calling create_tnx_producer().
130 */
create_txn_producer(rd_kafka_mock_cluster_t ** mclusterp,const char * transactional_id,int broker_cnt,...)131 static rd_kafka_t *create_txn_producer (rd_kafka_mock_cluster_t **mclusterp,
132 const char *transactional_id,
133 int broker_cnt, ...) {
134 rd_kafka_conf_t *conf;
135 rd_kafka_t *rk;
136 char numstr[8];
137 va_list ap;
138 const char *key;
139 rd_bool_t add_interceptors = rd_false;
140
141 rd_snprintf(numstr, sizeof(numstr), "%d", broker_cnt);
142
143 test_conf_init(&conf, NULL, 60);
144
145 test_conf_set(conf, "transactional.id", transactional_id);
146 /* Speed up reconnects */
147 test_conf_set(conf, "reconnect.backoff.max.ms", "2000");
148 test_conf_set(conf, "test.mock.num.brokers", numstr);
149 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
150
151 test_curr->ignore_dr_err = rd_false;
152
153 va_start(ap, broker_cnt);
154 while ((key = va_arg(ap, const char *))) {
155 if (!strcmp(key, "on_response_received")) {
156 add_interceptors = rd_true;
157 (void)va_arg(ap, const char *);
158 } else {
159 test_conf_set(conf, key, va_arg(ap, const char *));
160 }
161 }
162 va_end(ap);
163
164 /* Add an on_.. interceptors */
165 if (add_interceptors)
166 rd_kafka_conf_interceptor_add_on_new(
167 conf,
168 "on_new_producer",
169 on_new_producer, NULL);
170
171 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
172
173 if (mclusterp) {
174 *mclusterp = rd_kafka_handle_mock_cluster(rk);
175 TEST_ASSERT(*mclusterp, "failed to create mock cluster");
176 }
177
178 return rk;
179 }
180
181
182 /**
183 * @brief Test recoverable errors using mock broker error injections
184 * and code coverage checks.
185 */
do_test_txn_recoverable_errors(void)186 static void do_test_txn_recoverable_errors (void) {
187 rd_kafka_t *rk;
188 rd_kafka_mock_cluster_t *mcluster;
189 rd_kafka_topic_partition_list_t *offsets;
190 rd_kafka_consumer_group_metadata_t *cgmetadata;
191 const char *groupid = "myGroupId";
192 const char *txnid = "myTxnId";
193
194 SUB_TEST_QUICK();
195
196 rk = create_txn_producer(&mcluster, txnid, 3,
197 "batch.num.messages", "1",
198 NULL);
199
200 /* Make sure transaction and group coordinators are different.
201 * This verifies that AddOffsetsToTxnRequest isn't sent to the
202 * transaction coordinator but the group coordinator. */
203 rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
204 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 2);
205
206 /*
207 * Inject som InitProducerId errors that causes retries
208 */
209 rd_kafka_mock_push_request_errors(
210 mcluster,
211 RD_KAFKAP_InitProducerId,
212 3,
213 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
214 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
215 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
216
217 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
218
219 (void)RD_UT_COVERAGE_CHECK(0); /* idemp_request_pid_failed(retry) */
220 (void)RD_UT_COVERAGE_CHECK(1); /* txn_idemp_state_change(READY) */
221
222 /*
223 * Start a transaction
224 */
225 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
226
227
228 /* Produce a message without error first */
229 TEST_CALL_ERR__(rd_kafka_producev(rk,
230 RD_KAFKA_V_TOPIC("mytopic"),
231 RD_KAFKA_V_PARTITION(0),
232 RD_KAFKA_V_VALUE("hi", 2),
233 RD_KAFKA_V_END));
234
235 /*
236 * Produce a message, let it fail with a non-idempo/non-txn
237 * retryable error
238 */
239 rd_kafka_mock_push_request_errors(
240 mcluster,
241 RD_KAFKAP_Produce,
242 1,
243 RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS);
244
245 TEST_CALL_ERR__(rd_kafka_producev(rk,
246 RD_KAFKA_V_TOPIC("mytopic"),
247 RD_KAFKA_V_PARTITION(0),
248 RD_KAFKA_V_VALUE("hi", 2),
249 RD_KAFKA_V_END));
250
251 /* Make sure messages are produced */
252 rd_kafka_flush(rk, -1);
253
254 /*
255 * Send some arbitrary offsets, first with some failures, then
256 * succeed.
257 */
258 offsets = rd_kafka_topic_partition_list_new(4);
259 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
260 rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
261 999999111;
262 rd_kafka_topic_partition_list_add(offsets, "srctopic", 0)->offset = 999;
263 rd_kafka_topic_partition_list_add(offsets, "srctop2", 3499)->offset =
264 123456789;
265
266 rd_kafka_mock_push_request_errors(
267 mcluster,
268 RD_KAFKAP_AddPartitionsToTxn,
269 1,
270 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
271
272 rd_kafka_mock_push_request_errors(
273 mcluster,
274 RD_KAFKAP_TxnOffsetCommit,
275 2,
276 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
277 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
278
279 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
280
281 TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
282 rk, offsets,
283 cgmetadata, -1));
284
285 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
286 rd_kafka_topic_partition_list_destroy(offsets);
287
288 /*
289 * Commit transaction, first with som failures, then succeed.
290 */
291 rd_kafka_mock_push_request_errors(
292 mcluster,
293 RD_KAFKAP_EndTxn,
294 3,
295 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
296 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
297 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
298
299 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
300
301 /* All done */
302
303 rd_kafka_destroy(rk);
304
305 SUB_TEST_PASS();
306 }
307
308
309 /**
310 * @brief KIP-360: Test that fatal idempotence errors triggers abortable
311 * transaction errors and that the producer can recover.
312 */
do_test_txn_fatal_idempo_errors(void)313 static void do_test_txn_fatal_idempo_errors (void) {
314 rd_kafka_t *rk;
315 rd_kafka_mock_cluster_t *mcluster;
316 rd_kafka_error_t *error;
317 const char *txnid = "myTxnId";
318
319 SUB_TEST_QUICK();
320
321 rk = create_txn_producer(&mcluster, txnid, 3,
322 "batch.num.messages", "1",
323 NULL);
324
325 test_curr->ignore_dr_err = rd_true;
326 test_curr->is_fatal_cb = error_is_fatal_cb;
327 allowed_error = RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID;
328
329 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
330
331 /*
332 * Start a transaction
333 */
334 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
335
336
337 /* Produce a message without error first */
338 TEST_CALL_ERR__(rd_kafka_producev(rk,
339 RD_KAFKA_V_TOPIC("mytopic"),
340 RD_KAFKA_V_PARTITION(0),
341 RD_KAFKA_V_VALUE("hi", 2),
342 RD_KAFKA_V_END));
343
344 /* Produce a message, let it fail with a fatal idempo error. */
345 rd_kafka_mock_push_request_errors(
346 mcluster,
347 RD_KAFKAP_Produce,
348 1,
349 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
350
351 TEST_CALL_ERR__(rd_kafka_producev(rk,
352 RD_KAFKA_V_TOPIC("mytopic"),
353 RD_KAFKA_V_PARTITION(0),
354 RD_KAFKA_V_VALUE("hi", 2),
355 RD_KAFKA_V_END));
356
357 /* Commit the transaction, should fail */
358 error = rd_kafka_commit_transaction(rk, -1);
359 TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
360
361 TEST_SAY("commit_transaction() failed (expectedly): %s\n",
362 rd_kafka_error_string(error));
363
364 TEST_ASSERT(!rd_kafka_error_is_fatal(error),
365 "Did not expect fatal error");
366 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
367 "Expected abortable error");
368 rd_kafka_error_destroy(error);
369
370 /* Abort the transaction */
371 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
372
373 /* Run a new transaction without errors to verify that the
374 * producer can recover. */
375 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
376
377 TEST_CALL_ERR__(rd_kafka_producev(rk,
378 RD_KAFKA_V_TOPIC("mytopic"),
379 RD_KAFKA_V_PARTITION(0),
380 RD_KAFKA_V_VALUE("hi", 2),
381 RD_KAFKA_V_END));
382
383 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
384
385 /* All done */
386
387 rd_kafka_destroy(rk);
388
389 allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
390
391 SUB_TEST_PASS();
392 }
393
394
395 /**
396 * @brief KIP-360: Test that fatal idempotence errors triggers abortable
397 * transaction errors, but let the broker-side bumping of the
398 * producer PID take longer than the remaining transaction timeout
399 * which should raise a retriable error from abort_transaction().
400 *
401 * @param with_sleep After the first abort sleep longer than it takes to
402 * re-init the pid so that the internal state automatically
403 * transitions.
404 */
do_test_txn_slow_reinit(rd_bool_t with_sleep)405 static void do_test_txn_slow_reinit (rd_bool_t with_sleep) {
406 rd_kafka_t *rk;
407 rd_kafka_mock_cluster_t *mcluster;
408 rd_kafka_error_t *error;
409 int32_t txn_coord = 2;
410 const char *txnid = "myTxnId";
411 test_timing_t timing;
412
413 SUB_TEST("%s sleep", with_sleep ? "with": "without");
414
415 rk = create_txn_producer(&mcluster, txnid, 3,
416 "batch.num.messages", "1",
417 NULL);
418
419 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
420 txn_coord);
421
422 test_curr->ignore_dr_err = rd_true;
423 test_curr->is_fatal_cb = NULL;
424
425 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
426
427 /*
428 * Start a transaction
429 */
430 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
431
432
433 /* Produce a message without error first */
434 TEST_CALL_ERR__(rd_kafka_producev(rk,
435 RD_KAFKA_V_TOPIC("mytopic"),
436 RD_KAFKA_V_PARTITION(0),
437 RD_KAFKA_V_VALUE("hi", 2),
438 RD_KAFKA_V_END));
439
440 test_flush(rk, -1);
441
442 /* Set transaction coordinator latency higher than
443 * the abort_transaction() call timeout so that the automatic
444 * re-initpid takes longer than abort_transaction(). */
445 rd_kafka_mock_broker_push_request_error_rtts(
446 mcluster,
447 txn_coord,
448 RD_KAFKAP_InitProducerId,
449 1,
450 RD_KAFKA_RESP_ERR_NO_ERROR, 10000/*10s*/);
451
452 /* Produce a message, let it fail with a fatal idempo error. */
453 rd_kafka_mock_push_request_errors(
454 mcluster,
455 RD_KAFKAP_Produce,
456 1,
457 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
458
459 TEST_CALL_ERR__(rd_kafka_producev(rk,
460 RD_KAFKA_V_TOPIC("mytopic"),
461 RD_KAFKA_V_PARTITION(0),
462 RD_KAFKA_V_VALUE("hi", 2),
463 RD_KAFKA_V_END));
464
465
466 /* Commit the transaction, should fail */
467 TIMING_START(&timing, "commit_transaction(-1)");
468 error = rd_kafka_commit_transaction(rk, -1);
469 TIMING_STOP(&timing);
470 TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
471
472 TEST_SAY("commit_transaction() failed (expectedly): %s\n",
473 rd_kafka_error_string(error));
474
475 TEST_ASSERT(!rd_kafka_error_is_fatal(error),
476 "Did not expect fatal error");
477 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
478 "Expected abortable error");
479 rd_kafka_error_destroy(error);
480
481 /* Abort the transaction, should fail with retriable (timeout) error */
482 TIMING_START(&timing, "abort_transaction(100)");
483 error = rd_kafka_abort_transaction(rk, 100);
484 TIMING_STOP(&timing);
485 TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
486
487 TEST_SAY("First abort_transaction() failed: %s\n",
488 rd_kafka_error_string(error));
489 TEST_ASSERT(!rd_kafka_error_is_fatal(error),
490 "Did not expect fatal error");
491 TEST_ASSERT(rd_kafka_error_is_retriable(error),
492 "Expected retriable error");
493 rd_kafka_error_destroy(error);
494
495 if (with_sleep)
496 rd_sleep(12);
497
498 /* Retry abort, should now finish. */
499 TEST_SAY("Retrying abort\n");
500 TIMING_START(&timing, "abort_transaction(-1)");
501 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
502 TIMING_STOP(&timing);
503
504 /* Run a new transaction without errors to verify that the
505 * producer can recover. */
506 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
507
508 TEST_CALL_ERR__(rd_kafka_producev(rk,
509 RD_KAFKA_V_TOPIC("mytopic"),
510 RD_KAFKA_V_PARTITION(0),
511 RD_KAFKA_V_VALUE("hi", 2),
512 RD_KAFKA_V_END));
513
514 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
515
516 /* All done */
517
518 rd_kafka_destroy(rk);
519
520 allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
521
522 SUB_TEST_PASS();
523 }
524
525
526
527 /**
528 * @brief KIP-360: Test that fatal idempotence errors triggers abortable
529 * transaction errors, but let the broker-side bumping of the
530 * producer PID fail with a fencing error.
531 * Should raise a fatal error.
532 */
do_test_txn_fenced_reinit(void)533 static void do_test_txn_fenced_reinit (void) {
534 rd_kafka_t *rk;
535 rd_kafka_mock_cluster_t *mcluster;
536 rd_kafka_error_t *error;
537 int32_t txn_coord = 2;
538 const char *txnid = "myTxnId";
539 char errstr[512];
540 rd_kafka_resp_err_t fatal_err;
541
542 SUB_TEST_QUICK();
543
544 rk = create_txn_producer(&mcluster, txnid, 3,
545 "batch.num.messages", "1",
546 NULL);
547
548 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
549 txn_coord);
550
551 test_curr->ignore_dr_err = rd_true;
552 test_curr->is_fatal_cb = error_is_fatal_cb;
553 allowed_error = RD_KAFKA_RESP_ERR__FENCED;
554
555 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
556
557 /*
558 * Start a transaction
559 */
560 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
561
562
563 /* Produce a message without error first */
564 TEST_CALL_ERR__(rd_kafka_producev(rk,
565 RD_KAFKA_V_TOPIC("mytopic"),
566 RD_KAFKA_V_PARTITION(0),
567 RD_KAFKA_V_VALUE("hi", 2),
568 RD_KAFKA_V_END));
569
570 test_flush(rk, -1);
571
572 /* Fail the PID reinit */
573 rd_kafka_mock_broker_push_request_error_rtts(
574 mcluster,
575 txn_coord,
576 RD_KAFKAP_InitProducerId,
577 1,
578 RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH, 0);
579
580 /* Produce a message, let it fail with a fatal idempo error. */
581 rd_kafka_mock_push_request_errors(
582 mcluster,
583 RD_KAFKAP_Produce,
584 1,
585 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
586
587 TEST_CALL_ERR__(rd_kafka_producev(rk,
588 RD_KAFKA_V_TOPIC("mytopic"),
589 RD_KAFKA_V_PARTITION(0),
590 RD_KAFKA_V_VALUE("hi", 2),
591 RD_KAFKA_V_END));
592
593 test_flush(rk, -1);
594
595 /* Abort the transaction, should fail with a fatal error */
596 error = rd_kafka_abort_transaction(rk, -1);
597 TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
598
599 TEST_SAY("abort_transaction() failed: %s\n",
600 rd_kafka_error_string(error));
601 TEST_ASSERT(rd_kafka_error_is_fatal(error),
602 "Expected a fatal error");
603 rd_kafka_error_destroy(error);
604
605 fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
606 TEST_ASSERT(fatal_err,
607 "Expected a fatal error to have been raised");
608 TEST_SAY("Fatal error: %s: %s\n",
609 rd_kafka_err2name(fatal_err), errstr);
610
611 /* All done */
612
613 rd_kafka_destroy(rk);
614
615 allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
616
617 SUB_TEST_PASS();
618 }
619
620
621 /**
622 * @brief Test EndTxn errors.
623 */
do_test_txn_endtxn_errors(void)624 static void do_test_txn_endtxn_errors (void) {
625 rd_kafka_t *rk = NULL;
626 rd_kafka_mock_cluster_t *mcluster = NULL;
627 rd_kafka_resp_err_t err;
628 struct {
629 size_t error_cnt;
630 rd_kafka_resp_err_t errors[4];
631 rd_kafka_resp_err_t exp_err;
632 rd_bool_t exp_retriable;
633 rd_bool_t exp_abortable;
634 rd_bool_t exp_fatal;
635 } scenario[] = {
636 /* This list of errors is from the EndTxnResponse handler in
637 * AK clients/.../TransactionManager.java */
638 { /* #0 */
639 2,
640 { RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
641 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE },
642 /* Should auto-recover */
643 RD_KAFKA_RESP_ERR_NO_ERROR,
644 },
645 { /* #1 */
646 2,
647 { RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
648 RD_KAFKA_RESP_ERR_NOT_COORDINATOR },
649 /* Should auto-recover */
650 RD_KAFKA_RESP_ERR_NO_ERROR,
651 },
652 { /* #2 */
653 1,
654 { RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS },
655 /* Should auto-recover */
656 RD_KAFKA_RESP_ERR_NO_ERROR,
657 },
658 { /* #3 */
659 3,
660 { RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
661 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
662 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS },
663 /* Should auto-recover */
664 RD_KAFKA_RESP_ERR_NO_ERROR,
665 },
666 { /* #4 */
667 1,
668 { RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID },
669 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
670 rd_false /* !retriable */,
671 rd_true /* abortable */,
672 rd_false /* !fatal */
673 },
674 { /* #5 */
675 1,
676 { RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING },
677 RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
678 rd_false /* !retriable */,
679 rd_true /* abortable */,
680 rd_false /* !fatal */
681 },
682 { /* #6 */
683 1,
684 { RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH },
685 /* This error is normalized */
686 RD_KAFKA_RESP_ERR__FENCED,
687 rd_false /* !retriable */,
688 rd_false /* !abortable */,
689 rd_true /* fatal */
690 },
691 { /* #7 */
692 1,
693 { RD_KAFKA_RESP_ERR_PRODUCER_FENCED },
694 /* This error is normalized */
695 RD_KAFKA_RESP_ERR__FENCED,
696 rd_false /* !retriable */,
697 rd_false /* !abortable */,
698 rd_true /* fatal */
699 },
700 { /* #8 */
701 1,
702 { RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED },
703 RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
704 rd_false /* !retriable */,
705 rd_false /* !abortable */,
706 rd_true /* fatal */
707 },
708 { /* #9 */
709 1,
710 { RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED },
711 RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
712 rd_false /* !retriable */,
713 rd_true /* abortable */,
714 rd_false /* !fatal */
715 },
716 { /* #10 */
717 /* Any other error should raise a fatal error */
718 1,
719 { RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE },
720 RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
721 rd_false /* !retriable */,
722 rd_true /* abortable */,
723 rd_false /* !fatal */,
724 },
725 { 0 },
726 };
727 int i;
728
729 SUB_TEST_QUICK();
730
731 for (i = 0 ; scenario[i].error_cnt > 0 ; i++) {
732 int j;
733 /* For each scenario, test:
734 * commit_transaction()
735 * flush() + commit_transaction()
736 * abort_transaction()
737 * flush() + abort_transaction()
738 */
739 for (j = 0 ; j < (2+2) ; j++) {
740 rd_bool_t commit = j < 2;
741 rd_bool_t with_flush = j & 1;
742 const char *commit_str =
743 commit ?
744 (with_flush ? "commit&flush" : "commit") :
745 (with_flush ? "abort&flush" : "abort");
746 rd_kafka_topic_partition_list_t *offsets;
747 rd_kafka_consumer_group_metadata_t *cgmetadata;
748 rd_kafka_error_t *error;
749 test_timing_t t_call;
750
751 TEST_SAY("Testing scenario #%d %s with %"PRIusz
752 " injected erorrs, expecting %s\n",
753 i, commit_str,
754 scenario[i].error_cnt,
755 rd_kafka_err2name(scenario[i].exp_err));
756
757 if (!rk) {
758 const char *txnid = "myTxnId";
759 rk = create_txn_producer(&mcluster, txnid,
760 3, NULL);
761 TEST_CALL_ERROR__(rd_kafka_init_transactions(
762 rk, 5000));
763 }
764
765 /*
766 * Start transaction
767 */
768 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
769
770 /* Transaction aborts will cause DR errors:
771 * ignore them. */
772 test_curr->ignore_dr_err = !commit;
773
774 /*
775 * Produce a message.
776 */
777 err = rd_kafka_producev(rk,
778 RD_KAFKA_V_TOPIC("mytopic"),
779 RD_KAFKA_V_VALUE("hi", 2),
780 RD_KAFKA_V_END);
781 TEST_ASSERT(!err, "produce failed: %s",
782 rd_kafka_err2str(err));
783
784 if (with_flush)
785 test_flush(rk, -1);
786
787 /*
788 * Send some arbitrary offsets.
789 */
790 offsets = rd_kafka_topic_partition_list_new(4);
791 rd_kafka_topic_partition_list_add(offsets, "srctopic",
792 3)->offset = 12;
793 rd_kafka_topic_partition_list_add(offsets, "srctop2",
794 99)->offset = 99999;
795
796 cgmetadata = rd_kafka_consumer_group_metadata_new(
797 "mygroupid");
798
799 TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
800 rk, offsets,
801 cgmetadata, -1));
802
803 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
804 rd_kafka_topic_partition_list_destroy(offsets);
805
806 /*
807 * Commit transaction, first with som failures,
808 * then succeed.
809 */
810 rd_kafka_mock_push_request_errors_array(
811 mcluster,
812 RD_KAFKAP_EndTxn,
813 scenario[i].error_cnt,
814 scenario[i].errors);
815
816 TIMING_START(&t_call, "%s", commit_str);
817 if (commit)
818 error = rd_kafka_commit_transaction(
819 rk, tmout_multip(5000));
820 else
821 error = rd_kafka_abort_transaction(
822 rk, tmout_multip(5000));
823 TIMING_STOP(&t_call);
824
825 if (error)
826 TEST_SAY("Scenario #%d %s failed: %s: %s "
827 "(retriable=%s, req_abort=%s, "
828 "fatal=%s)\n",
829 i, commit_str,
830 rd_kafka_error_name(error),
831 rd_kafka_error_string(error),
832 RD_STR_ToF(rd_kafka_error_is_retriable(error)),
833 RD_STR_ToF(rd_kafka_error_txn_requires_abort(error)),
834 RD_STR_ToF(rd_kafka_error_is_fatal(error)));
835 else
836 TEST_SAY("Scenario #%d %s succeeded\n",
837 i, commit_str);
838
839 if (!scenario[i].exp_err) {
840 TEST_ASSERT(!error,
841 "Expected #%d %s to succeed, "
842 "got %s",
843 i, commit_str,
844 rd_kafka_error_string(error));
845 continue;
846 }
847
848
849 TEST_ASSERT(error != NULL,
850 "Expected #%d %s to fail",
851 i, commit_str);
852 TEST_ASSERT(scenario[i].exp_err ==
853 rd_kafka_error_code(error),
854 "Scenario #%d: expected %s, not %s",
855 i,
856 rd_kafka_err2name(scenario[i].exp_err),
857 rd_kafka_error_name(error));
858 TEST_ASSERT(scenario[i].exp_retriable ==
859 (rd_bool_t)
860 rd_kafka_error_is_retriable(error),
861 "Scenario #%d: retriable mismatch",
862 i);
863 TEST_ASSERT(scenario[i].exp_abortable ==
864 (rd_bool_t)
865 rd_kafka_error_txn_requires_abort(error),
866 "Scenario #%d: abortable mismatch",
867 i);
868 TEST_ASSERT(scenario[i].exp_fatal ==
869 (rd_bool_t)rd_kafka_error_is_fatal(error),
870 "Scenario #%d: fatal mismatch", i);
871
872 /* Handle errors according to the error flags */
873 if (rd_kafka_error_is_fatal(error)) {
874 TEST_SAY("Fatal error, destroying producer\n");
875 rd_kafka_error_destroy(error);
876 rd_kafka_destroy(rk);
877 rk = NULL; /* Will be re-created on the next
878 * loop iteration. */
879
880 } else if (rd_kafka_error_txn_requires_abort(error)) {
881 rd_kafka_error_destroy(error);
882 TEST_SAY("Abortable error, "
883 "aborting transaction\n");
884 TEST_CALL_ERROR__(
885 rd_kafka_abort_transaction(rk, -1));
886
887 } else if (rd_kafka_error_is_retriable(error)) {
888 rd_kafka_error_destroy(error);
889 TEST_SAY("Retriable error, retrying %s once\n",
890 commit_str);
891 if (commit)
892 TEST_CALL_ERROR__(
893 rd_kafka_commit_transaction(
894 rk, 5000));
895 else
896 TEST_CALL_ERROR__(
897 rd_kafka_abort_transaction(
898 rk, 5000));
899 } else {
900 TEST_FAIL("Scenario #%d %s: "
901 "Permanent error without enough "
902 "hints to proceed: %s\n",
903 i, commit_str,
904 rd_kafka_error_string(error));
905 }
906 }
907 }
908
909 /* All done */
910 if (rk)
911 rd_kafka_destroy(rk);
912
913 SUB_TEST_PASS();
914 }
915
916
917 /**
918 * @brief Test that the commit/abort works properly with infinite timeout.
919 */
do_test_txn_endtxn_infinite(void)920 static void do_test_txn_endtxn_infinite (void) {
921 rd_kafka_t *rk;
922 rd_kafka_mock_cluster_t *mcluster = NULL;
923 const char *txnid = "myTxnId";
924 int i;
925
926 SUB_TEST_QUICK();
927
928 rk = create_txn_producer(&mcluster, txnid, 3, NULL);
929
930 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
931
932 for (i = 0 ; i < 2 ; i++) {
933 rd_bool_t commit = i == 0;
934 const char *commit_str = commit ? "commit" : "abort";
935 rd_kafka_error_t *error;
936 test_timing_t t_call;
937
938 /* Messages will fail on as the transaction fails,
939 * ignore the DR error */
940 test_curr->ignore_dr_err = rd_true;
941
942 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
943
944 TEST_CALL_ERR__(rd_kafka_producev(rk,
945 RD_KAFKA_V_TOPIC("mytopic"),
946 RD_KAFKA_V_VALUE("hi", 2),
947 RD_KAFKA_V_END));
948
949 /*
950 * Commit/abort transaction, first with som retriable failures,
951 * then success.
952 */
953 rd_kafka_mock_push_request_errors(
954 mcluster,
955 RD_KAFKAP_EndTxn,
956 10,
957 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
958 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
959 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
960 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
961 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
962 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
963 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
964 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
965 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
966 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
967
968 rd_sleep(1);
969
970 TIMING_START(&t_call, "%s_transaction()", commit_str);
971 if (commit)
972 error = rd_kafka_commit_transaction(rk, -1);
973 else
974 error = rd_kafka_abort_transaction(rk, -1);
975 TIMING_STOP(&t_call);
976
977 TEST_SAY("%s returned %s\n",
978 commit_str,
979 error ? rd_kafka_error_string(error) : "success");
980
981 TEST_ASSERT(!error,
982 "Expected %s to succeed, got %s",
983 commit_str, rd_kafka_error_string(error));
984
985 }
986
987 /* All done */
988
989 rd_kafka_destroy(rk);
990
991 SUB_TEST_PASS();
992 }
993
994
995
996 /**
997 * @brief Test that the commit/abort user timeout is honoured.
998 */
do_test_txn_endtxn_timeout(void)999 static void do_test_txn_endtxn_timeout (void) {
1000 rd_kafka_t *rk;
1001 rd_kafka_mock_cluster_t *mcluster = NULL;
1002 const char *txnid = "myTxnId";
1003 int i;
1004
1005 SUB_TEST_QUICK();
1006
1007 rk = create_txn_producer(&mcluster, txnid, 3, NULL);
1008
1009 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1010
1011 for (i = 0 ; i < 2 ; i++) {
1012 rd_bool_t commit = i == 0;
1013 const char *commit_str = commit ? "commit" : "abort";
1014 rd_kafka_error_t *error;
1015 test_timing_t t_call;
1016
1017 /* Messages will fail on as the transaction fails,
1018 * ignore the DR error */
1019 test_curr->ignore_dr_err = rd_true;
1020
1021 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1022
1023 TEST_CALL_ERR__(rd_kafka_producev(rk,
1024 RD_KAFKA_V_TOPIC("mytopic"),
1025 RD_KAFKA_V_VALUE("hi", 2),
1026 RD_KAFKA_V_END));
1027
1028 /*
1029 * Commit/abort transaction, first with som retriable failures
1030 * whos retries exceed the user timeout.
1031 */
1032 rd_kafka_mock_push_request_errors(
1033 mcluster,
1034 RD_KAFKAP_EndTxn,
1035 10,
1036 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
1037 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1038 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1039 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1040 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1041 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1042 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1043 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
1044 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1045 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
1046
1047 rd_sleep(1);
1048
1049 TIMING_START(&t_call, "%s_transaction()", commit_str);
1050 if (commit)
1051 error = rd_kafka_commit_transaction(rk, 100);
1052 else
1053 error = rd_kafka_abort_transaction(rk, 100);
1054 TIMING_STOP(&t_call);
1055
1056 TEST_SAY("%s returned %s\n",
1057 commit_str,
1058 error ? rd_kafka_error_string(error) : "success");
1059
1060 TEST_ASSERT(error != NULL,
1061 "Expected %s to fail", commit_str);
1062
1063 TEST_ASSERT(rd_kafka_error_code(error) ==
1064 RD_KAFKA_RESP_ERR__TIMED_OUT,
1065 "Expected %s to fail with timeout, not %s: %s",
1066 commit_str,
1067 rd_kafka_error_name(error),
1068 rd_kafka_error_string(error));
1069
1070 if (!commit)
1071 TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error),
1072 "abort_transaction() failure should raise "
1073 "a txn_requires_abort error");
1074 else {
1075 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
1076 "commit_transaction() failure should raise "
1077 "a txn_requires_abort error");
1078 TEST_SAY("Aborting transaction as instructed by "
1079 "error flag\n");
1080 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1081 }
1082
1083 rd_kafka_error_destroy(error);
1084
1085 TIMING_ASSERT(&t_call, 99, 199);
1086 }
1087
1088 /* All done */
1089
1090 rd_kafka_destroy(rk);
1091
1092 SUB_TEST_PASS();
1093 }
1094
1095
1096 /**
1097 * @brief Test that EndTxn is properly sent for aborted transactions
1098 * even if AddOffsetsToTxnRequest was retried.
1099 * This is a check for a txn_req_cnt bug.
1100 */
do_test_txn_req_cnt(void)1101 static void do_test_txn_req_cnt (void) {
1102 rd_kafka_t *rk;
1103 rd_kafka_mock_cluster_t *mcluster;
1104 rd_kafka_topic_partition_list_t *offsets;
1105 rd_kafka_consumer_group_metadata_t *cgmetadata;
1106 const char *txnid = "myTxnId";
1107
1108 SUB_TEST_QUICK();
1109
1110 rk = create_txn_producer(&mcluster, txnid, 3, NULL);
1111
1112 /* Messages will fail on abort(), ignore the DR error */
1113 test_curr->ignore_dr_err = rd_true;
1114
1115 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1116
1117 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1118
1119 /*
1120 * Send some arbitrary offsets, first with some failures, then
1121 * succeed.
1122 */
1123 offsets = rd_kafka_topic_partition_list_new(2);
1124 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1125 rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
1126 999999111;
1127
1128 rd_kafka_mock_push_request_errors(
1129 mcluster,
1130 RD_KAFKAP_AddOffsetsToTxn,
1131 2,
1132 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
1133 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
1134
1135 rd_kafka_mock_push_request_errors(
1136 mcluster,
1137 RD_KAFKAP_TxnOffsetCommit,
1138 2,
1139 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
1140 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
1141
1142 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1143
1144 TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
1145 rk, offsets,
1146 cgmetadata, -1));
1147
1148 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1149 rd_kafka_topic_partition_list_destroy(offsets);
1150
1151 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
1152
1153 /* All done */
1154
1155 rd_kafka_destroy(rk);
1156
1157 SUB_TEST_PASS();
1158 }
1159
1160
1161 /**
1162 * @brief Test abortable errors using mock broker error injections
1163 * and code coverage checks.
1164 */
do_test_txn_requires_abort_errors(void)1165 static void do_test_txn_requires_abort_errors (void) {
1166 rd_kafka_t *rk;
1167 rd_kafka_mock_cluster_t *mcluster;
1168 rd_kafka_error_t *error;
1169 rd_kafka_resp_err_t err;
1170 rd_kafka_topic_partition_list_t *offsets;
1171 rd_kafka_consumer_group_metadata_t *cgmetadata;
1172 int r;
1173
1174 SUB_TEST_QUICK();
1175
1176 rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1177
1178 test_curr->ignore_dr_err = rd_true;
1179
1180 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1181
1182 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1183
1184 /*
1185 * 1. Fail on produce
1186 */
1187 TEST_SAY("1. Fail on produce\n");
1188
1189 rd_kafka_mock_push_request_errors(
1190 mcluster,
1191 RD_KAFKAP_Produce,
1192 1,
1193 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
1194
1195 err = rd_kafka_producev(rk,
1196 RD_KAFKA_V_TOPIC("mytopic"),
1197 RD_KAFKA_V_VALUE("hi", 2),
1198 RD_KAFKA_V_END);
1199 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1200
1201 /* Wait for messages to fail */
1202 test_flush(rk, 5000);
1203
1204 /* Any other transactional API should now raise an error */
1205 offsets = rd_kafka_topic_partition_list_new(1);
1206 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1207
1208 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1209
1210 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1211 cgmetadata, -1);
1212
1213 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1214 rd_kafka_topic_partition_list_destroy(offsets);
1215 TEST_ASSERT(error, "expected error");
1216 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
1217 "expected abortable error, not %s",
1218 rd_kafka_error_string(error));
1219 TEST_SAY("Error %s: %s\n",
1220 rd_kafka_error_name(error),
1221 rd_kafka_error_string(error));
1222 rd_kafka_error_destroy(error);
1223
1224 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1225
1226 /*
1227 * 2. Restart transaction and fail on AddPartitionsToTxn
1228 */
1229 TEST_SAY("2. Fail on AddPartitionsToTxn\n");
1230
1231 /* First refresh proper Metadata to clear the topic's auth error,
1232 * otherwise the produce() below will fail immediately. */
1233 r = test_get_partition_count(rk, "mytopic", 5000);
1234 TEST_ASSERT(r > 0, "Expected topic %s to exist", "mytopic");
1235
1236 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1237
1238 rd_kafka_mock_push_request_errors(
1239 mcluster,
1240 RD_KAFKAP_AddPartitionsToTxn,
1241 1,
1242 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
1243
1244 err = rd_kafka_producev(rk,
1245 RD_KAFKA_V_TOPIC("mytopic"),
1246 RD_KAFKA_V_VALUE("hi", 2),
1247 RD_KAFKA_V_END);
1248 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1249
1250 error = rd_kafka_commit_transaction(rk, 5000);
1251 TEST_ASSERT(error, "commit_transaction should have failed");
1252 TEST_SAY("commit_transaction() error %s: %s\n",
1253 rd_kafka_error_name(error),
1254 rd_kafka_error_string(error));
1255 rd_kafka_error_destroy(error);
1256
1257 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1258
1259 /*
1260 * 3. Restart transaction and fail on AddOffsetsToTxn
1261 */
1262 TEST_SAY("3. Fail on AddOffsetsToTxn\n");
1263
1264 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1265
1266 err = rd_kafka_producev(rk,
1267 RD_KAFKA_V_TOPIC("mytopic"),
1268 RD_KAFKA_V_VALUE("hi", 2),
1269 RD_KAFKA_V_END);
1270 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1271
1272 rd_kafka_mock_push_request_errors(
1273 mcluster,
1274 RD_KAFKAP_AddOffsetsToTxn,
1275 1,
1276 RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED);
1277
1278 offsets = rd_kafka_topic_partition_list_new(1);
1279 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1280 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1281
1282 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1283 cgmetadata, -1);
1284 TEST_ASSERT(error, "Expected send_offsets..() to fail");
1285 TEST_ASSERT(rd_kafka_error_code(error) ==
1286 RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
1287 "expected send_offsets_to_transaction() to fail with "
1288 "group auth error: not %s",
1289 rd_kafka_error_name(error));
1290 rd_kafka_error_destroy(error);
1291
1292 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1293 rd_kafka_topic_partition_list_destroy(offsets);
1294
1295
1296 error = rd_kafka_commit_transaction(rk, 5000);
1297 TEST_ASSERT(error, "commit_transaction should have failed");
1298 rd_kafka_error_destroy(error);
1299
1300 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1301
1302 /* All done */
1303
1304 rd_kafka_destroy(rk);
1305
1306 SUB_TEST_PASS();
1307 }
1308
1309
1310 /**
1311 * @brief Test error handling and recover for when broker goes down during
1312 * an ongoing transaction.
1313 */
do_test_txn_broker_down_in_txn(rd_bool_t down_coord)1314 static void do_test_txn_broker_down_in_txn (rd_bool_t down_coord) {
1315 rd_kafka_t *rk;
1316 rd_kafka_mock_cluster_t *mcluster;
1317 int32_t coord_id, leader_id, down_id;
1318 const char *down_what;
1319 rd_kafka_resp_err_t err;
1320 const char *topic = "test";
1321 const char *transactional_id = "txnid";
1322 int msgcnt = 1000;
1323 int remains = 0;
1324
1325 /* Assign coordinator and leader to two different brokers */
1326 coord_id = 1;
1327 leader_id = 2;
1328 if (down_coord) {
1329 down_id = coord_id;
1330 down_what = "coordinator";
1331 } else {
1332 down_id = leader_id;
1333 down_what = "leader";
1334 }
1335
1336 SUB_TEST_QUICK("Test %s down", down_what);
1337
1338 rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
1339
1340 /* Broker down is not a test-failing error */
1341 allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
1342 test_curr->is_fatal_cb = error_is_fatal_cb;
1343
1344 err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
1345 TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
1346
1347 rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1348 coord_id);
1349 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
1350
1351 /* Start transactioning */
1352 TEST_SAY("Starting transaction\n");
1353 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1354
1355 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1356
1357 test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1358 0, msgcnt / 2, NULL, 0, &remains);
1359
1360 TEST_SAY("Bringing down %s %"PRId32"\n", down_what, down_id);
1361 rd_kafka_mock_broker_set_down(mcluster, down_id);
1362
1363 rd_kafka_flush(rk, 3000);
1364
1365 /* Produce remaining messages */
1366 test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1367 msgcnt / 2, msgcnt / 2, NULL, 0, &remains);
1368
1369 rd_sleep(2);
1370
1371 TEST_SAY("Bringing up %s %"PRId32"\n", down_what, down_id);
1372 rd_kafka_mock_broker_set_up(mcluster, down_id);
1373
1374 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
1375
1376 TEST_ASSERT(remains == 0,
1377 "%d message(s) were not produced\n", remains);
1378
1379 rd_kafka_destroy(rk);
1380
1381 test_curr->is_fatal_cb = NULL;
1382
1383 SUB_TEST_PASS();
1384
1385 }
1386
1387
1388
1389 /**
1390 * @brief Advance the coord_id to the next broker.
1391 */
set_next_coord(rd_kafka_mock_cluster_t * mcluster,const char * transactional_id,int broker_cnt,int32_t * coord_idp)1392 static void set_next_coord (rd_kafka_mock_cluster_t *mcluster,
1393 const char *transactional_id, int broker_cnt,
1394 int32_t *coord_idp) {
1395 int32_t new_coord_id;
1396
1397 new_coord_id = 1 + ((*coord_idp) % (broker_cnt));
1398 TEST_SAY("Changing transaction coordinator from %"PRId32
1399 " to %"PRId32"\n", *coord_idp, new_coord_id);
1400 rd_kafka_mock_coordinator_set(mcluster, "transaction",
1401 transactional_id, new_coord_id);
1402
1403 *coord_idp = new_coord_id;
1404 }
1405
1406 /**
1407 * @brief Switch coordinator during a transaction.
1408 *
1409 * @remark Currently fails due to insufficient coord switch handling.
1410 */
do_test_txn_switch_coordinator(void)1411 static void do_test_txn_switch_coordinator (void) {
1412 rd_kafka_t *rk;
1413 rd_kafka_mock_cluster_t *mcluster;
1414 int32_t coord_id;
1415 const char *topic = "test";
1416 const char *transactional_id = "txnid";
1417 const int broker_cnt = 5;
1418 const int iterations = 20;
1419 int i;
1420
1421 test_timeout_set(iterations * 10);
1422
1423 SUB_TEST("Test switching coordinators");
1424
1425 rk = create_txn_producer(&mcluster, transactional_id, broker_cnt, NULL);
1426
1427 coord_id = 1;
1428 rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1429 coord_id);
1430
1431 /* Start transactioning */
1432 TEST_SAY("Starting transaction\n");
1433 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1434
1435 for (i = 0 ; i < iterations ; i++) {
1436 const int msgcnt = 100;
1437 int remains = 0;
1438
1439 set_next_coord(mcluster, transactional_id,
1440 broker_cnt, &coord_id);
1441
1442 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1443
1444 test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1445 0, msgcnt / 2, NULL, 0);
1446
1447 if (!(i % 3))
1448 set_next_coord(mcluster, transactional_id,
1449 broker_cnt, &coord_id);
1450
1451 /* Produce remaining messages */
1452 test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1453 msgcnt / 2, msgcnt / 2, NULL, 0,
1454 &remains);
1455
1456 if ((i & 1) || !(i % 8))
1457 set_next_coord(mcluster, transactional_id,
1458 broker_cnt, &coord_id);
1459
1460
1461 if (!(i % 5)) {
1462 test_curr->ignore_dr_err = rd_false;
1463 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
1464
1465 } else {
1466 test_curr->ignore_dr_err = rd_true;
1467 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1468 }
1469 }
1470
1471
1472 rd_kafka_destroy(rk);
1473
1474 SUB_TEST_PASS();
1475 }
1476
1477
1478 /**
1479 * @brief Test fatal error handling when transactions are not supported
1480 * by the broker.
1481 */
do_test_txns_not_supported(void)1482 static void do_test_txns_not_supported (void) {
1483 rd_kafka_t *rk;
1484 rd_kafka_conf_t *conf;
1485 rd_kafka_mock_cluster_t *mcluster;
1486 rd_kafka_error_t *error;
1487 rd_kafka_resp_err_t err;
1488
1489 SUB_TEST_QUICK();
1490
1491 test_conf_init(&conf, NULL, 10);
1492
1493 test_conf_set(conf, "transactional.id", "myxnid");
1494 test_conf_set(conf, "bootstrap.servers", ",");
1495 rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
1496
1497 rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
1498
1499 /* Create mock cluster */
1500 mcluster = rd_kafka_mock_cluster_new(rk, 3);
1501
1502 /* Disable InitProducerId */
1503 rd_kafka_mock_set_apiversion(mcluster, 22/*InitProducerId*/, -1, -1);
1504
1505
1506 rd_kafka_brokers_add(rk, rd_kafka_mock_cluster_bootstraps(mcluster));
1507
1508
1509
1510 error = rd_kafka_init_transactions(rk, 5*1000);
1511 TEST_SAY("init_transactions() returned %s: %s\n",
1512 error ? rd_kafka_error_name(error) : "success",
1513 error ? rd_kafka_error_string(error) : "success");
1514
1515 TEST_ASSERT(error, "Expected init_transactions() to fail");
1516 TEST_ASSERT(rd_kafka_error_code(error) ==
1517 RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
1518 "Expected init_transactions() to fail with %s, not %s: %s",
1519 rd_kafka_err2name(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE),
1520 rd_kafka_error_name(error),
1521 rd_kafka_error_string(error));
1522 rd_kafka_error_destroy(error);
1523
1524 err = rd_kafka_producev(rk,
1525 RD_KAFKA_V_TOPIC("test"),
1526 RD_KAFKA_V_KEY("test", 4),
1527 RD_KAFKA_V_END);
1528 TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
1529 "Expected producev() to fail with %s, not %s",
1530 rd_kafka_err2name(RD_KAFKA_RESP_ERR__FATAL),
1531 rd_kafka_err2name(err));
1532
1533 rd_kafka_mock_cluster_destroy(mcluster);
1534
1535 rd_kafka_destroy(rk);
1536
1537 SUB_TEST_PASS();
1538 }
1539
1540
1541 /**
1542 * @brief CONCURRENT_TRANSACTION on AddOffsets.. should be retried.
1543 */
do_test_txns_send_offsets_concurrent_is_retried(void)1544 static void do_test_txns_send_offsets_concurrent_is_retried (void) {
1545 rd_kafka_t *rk;
1546 rd_kafka_mock_cluster_t *mcluster;
1547 rd_kafka_resp_err_t err;
1548 rd_kafka_topic_partition_list_t *offsets;
1549 rd_kafka_consumer_group_metadata_t *cgmetadata;
1550
1551 SUB_TEST_QUICK();
1552
1553 rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1554
1555 test_curr->ignore_dr_err = rd_true;
1556
1557 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1558
1559 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1560
1561 err = rd_kafka_producev(rk,
1562 RD_KAFKA_V_TOPIC("mytopic"),
1563 RD_KAFKA_V_VALUE("hi", 2),
1564 RD_KAFKA_V_END);
1565 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1566
1567 /* Wait for messages to be delivered */
1568 test_flush(rk, 5000);
1569
1570
1571 /*
1572 * Have AddOffsetsToTxn fail but eventually succeed due to
1573 * infinite retries.
1574 */
1575 rd_kafka_mock_push_request_errors(
1576 mcluster,
1577 RD_KAFKAP_AddOffsetsToTxn,
1578 1+5,/* first request + some retries */
1579 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1580 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1581 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1582 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1583 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1584 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1585
1586 offsets = rd_kafka_topic_partition_list_new(1);
1587 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1588
1589 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1590
1591 TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(rk, offsets,
1592 cgmetadata, -1));
1593
1594 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1595 rd_kafka_topic_partition_list_destroy(offsets);
1596
1597 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
1598
1599 /* All done */
1600
1601 rd_kafka_destroy(rk);
1602
1603 SUB_TEST_PASS();
1604 }
1605
1606
1607 /**
1608 * @brief Verify that request timeouts don't cause crash (#2913).
1609 */
do_test_txns_no_timeout_crash(void)1610 static void do_test_txns_no_timeout_crash (void) {
1611 rd_kafka_t *rk;
1612 rd_kafka_mock_cluster_t *mcluster;
1613 rd_kafka_error_t *error;
1614 rd_kafka_resp_err_t err;
1615 rd_kafka_topic_partition_list_t *offsets;
1616 rd_kafka_consumer_group_metadata_t *cgmetadata;
1617
1618 SUB_TEST_QUICK();
1619
1620 rk = create_txn_producer(&mcluster, "txnid", 3,
1621 "socket.timeout.ms", "1000",
1622 "transaction.timeout.ms", "5000",
1623 NULL);
1624
1625 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1626
1627 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1628
1629 err = rd_kafka_producev(rk,
1630 RD_KAFKA_V_TOPIC("mytopic"),
1631 RD_KAFKA_V_VALUE("hi", 2),
1632 RD_KAFKA_V_END);
1633 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1634
1635 test_flush(rk, -1);
1636
1637 /* Delay all broker connections */
1638 if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 2000)) ||
1639 (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 2000)) ||
1640 (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 2000)))
1641 TEST_FAIL("Failed to set broker RTT: %s",
1642 rd_kafka_err2str(err));
1643
1644 /* send_offsets..() should now time out */
1645 offsets = rd_kafka_topic_partition_list_new(1);
1646 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1647 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1648
1649 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1650 cgmetadata, -1);
1651 TEST_ASSERT(error, "Expected send_offsets..() to fail");
1652 TEST_SAY("send_offsets..() failed with %serror: %s\n",
1653 rd_kafka_error_is_retriable(error) ? "retriable " : "",
1654 rd_kafka_error_string(error));
1655 TEST_ASSERT(rd_kafka_error_code(error) ==
1656 RD_KAFKA_RESP_ERR__TIMED_OUT,
1657 "expected send_offsets_to_transaction() to fail with "
1658 "timeout, not %s",
1659 rd_kafka_error_name(error));
1660 TEST_ASSERT(rd_kafka_error_is_retriable(error),
1661 "expected send_offsets_to_transaction() to fail with "
1662 "a retriable error");
1663 rd_kafka_error_destroy(error);
1664
1665 /* Reset delay and try again */
1666 if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 0)) ||
1667 (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 0)) ||
1668 (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 0)))
1669 TEST_FAIL("Failed to reset broker RTT: %s",
1670 rd_kafka_err2str(err));
1671
1672 TEST_SAY("Retrying send_offsets..()\n");
1673 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1674 cgmetadata, -1);
1675 TEST_ASSERT(!error, "Expected send_offsets..() to succeed, got: %s",
1676 rd_kafka_error_string(error));
1677
1678 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1679 rd_kafka_topic_partition_list_destroy(offsets);
1680
1681 /* All done */
1682 rd_kafka_destroy(rk);
1683
1684 SUB_TEST_PASS();
1685 }
1686
1687
1688 /**
1689 * @brief Test auth failure handling.
1690 */
do_test_txn_auth_failure(int16_t ApiKey,rd_kafka_resp_err_t ErrorCode)1691 static void do_test_txn_auth_failure (int16_t ApiKey,
1692 rd_kafka_resp_err_t ErrorCode) {
1693 rd_kafka_t *rk;
1694 rd_kafka_mock_cluster_t *mcluster;
1695 rd_kafka_error_t *error;
1696
1697 SUB_TEST_QUICK("ApiKey=%s ErrorCode=%s",
1698 rd_kafka_ApiKey2str(ApiKey),
1699 rd_kafka_err2name(ErrorCode));
1700
1701 rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1702
1703 rd_kafka_mock_push_request_errors(mcluster,
1704 ApiKey,
1705 1,
1706 ErrorCode);
1707
1708 error = rd_kafka_init_transactions(rk, 5000);
1709 TEST_ASSERT(error, "Expected init_transactions() to fail");
1710
1711 TEST_SAY("init_transactions() failed: %s: %s\n",
1712 rd_kafka_err2name(rd_kafka_error_code(error)),
1713 rd_kafka_error_string(error));
1714 TEST_ASSERT(rd_kafka_error_code(error) == ErrorCode,
1715 "Expected error %s, not %s",
1716 rd_kafka_err2name(ErrorCode),
1717 rd_kafka_err2name(rd_kafka_error_code(error)));
1718 TEST_ASSERT(rd_kafka_error_is_fatal(error),
1719 "Expected error to be fatal");
1720 TEST_ASSERT(!rd_kafka_error_is_retriable(error),
1721 "Expected error to not be retriable");
1722 rd_kafka_error_destroy(error);
1723
1724 /* All done */
1725
1726 rd_kafka_destroy(rk);
1727
1728 SUB_TEST_PASS();
1729 }
1730
1731
1732 /**
1733 * @brief Issue #3041: Commit fails due to message flush() taking too long,
1734 * eventually resulting in an unabortable error and failure to
1735 * re-init the transactional producer.
1736 */
do_test_txn_flush_timeout(void)1737 static void do_test_txn_flush_timeout (void) {
1738 rd_kafka_t *rk;
1739 rd_kafka_mock_cluster_t *mcluster;
1740 rd_kafka_topic_partition_list_t *offsets;
1741 rd_kafka_consumer_group_metadata_t *cgmetadata;
1742 rd_kafka_error_t *error;
1743 const char *txnid = "myTxnId";
1744 const char *topic = "myTopic";
1745 const int32_t coord_id = 2;
1746 int msgcounter = 0;
1747 rd_bool_t is_retry = rd_false;
1748
1749 SUB_TEST_QUICK();
1750
1751 rk = create_txn_producer(&mcluster, txnid, 3,
1752 "message.timeout.ms", "10000",
1753 "transaction.timeout.ms", "10000",
1754 /* Speed up coordinator reconnect */
1755 "reconnect.backoff.max.ms", "1000",
1756 NULL);
1757
1758
1759 /* Broker down is not a test-failing error */
1760 test_curr->is_fatal_cb = error_is_fatal_cb;
1761 allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
1762
1763 rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
1764
1765 /* Set coordinator so we can disconnect it later */
1766 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, coord_id);
1767
1768 /*
1769 * Init transactions
1770 */
1771 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1772
1773 retry:
1774 if (!is_retry) {
1775 /* First attempt should fail. */
1776
1777 test_curr->ignore_dr_err = rd_true;
1778 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
1779
1780 /* Assign invalid partition leaders for some partitions so
1781 * that messages will not be delivered. */
1782 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, -1);
1783 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, -1);
1784
1785 } else {
1786 /* The retry should succeed */
1787 test_curr->ignore_dr_err = rd_false;
1788 test_curr->exp_dr_err = is_retry ? RD_KAFKA_RESP_ERR_NO_ERROR :
1789 RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
1790
1791 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
1792 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
1793
1794 }
1795
1796
1797 /*
1798 * Start a transaction
1799 */
1800 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1801
1802 /*
1803 * Produce some messages to specific partitions and random.
1804 */
1805 test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 100, NULL, 10,
1806 &msgcounter);
1807 test_produce_msgs2_nowait(rk, topic, 1, 0, 0, 100, NULL, 10,
1808 &msgcounter);
1809 test_produce_msgs2_nowait(rk, topic, RD_KAFKA_PARTITION_UA,
1810 0, 0, 100, NULL, 10, &msgcounter);
1811
1812
1813 /*
1814 * Send some arbitrary offsets.
1815 */
1816 offsets = rd_kafka_topic_partition_list_new(4);
1817 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1818 rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
1819 999999111;
1820 rd_kafka_topic_partition_list_add(offsets, "srctopic", 0)->offset = 999;
1821 rd_kafka_topic_partition_list_add(offsets, "srctop2", 3499)->offset =
1822 123456789;
1823
1824 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1825
1826 TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
1827 rk, offsets,
1828 cgmetadata, -1));
1829
1830 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1831 rd_kafka_topic_partition_list_destroy(offsets);
1832
1833 rd_sleep(2);
1834
1835 if (!is_retry) {
1836 /* Now disconnect the coordinator. */
1837 TEST_SAY("Disconnecting transaction coordinator %"PRId32"\n",
1838 coord_id);
1839 rd_kafka_mock_broker_set_down(mcluster, coord_id);
1840 }
1841
1842 /*
1843 * Start committing.
1844 */
1845 error = rd_kafka_commit_transaction(rk, -1);
1846
1847 if (!is_retry) {
1848 TEST_ASSERT(error != NULL,
1849 "Expected commit to fail");
1850 TEST_SAY("commit_transaction() failed (expectedly): %s\n",
1851 rd_kafka_error_string(error));
1852 rd_kafka_error_destroy(error);
1853
1854 } else {
1855 TEST_ASSERT(!error,
1856 "Expected commit to succeed, not: %s",
1857 rd_kafka_error_string(error));
1858 }
1859
1860 if (!is_retry) {
1861 /*
1862 * Bring the coordinator back up.
1863 */
1864 rd_kafka_mock_broker_set_up(mcluster, coord_id);
1865 rd_sleep(2);
1866
1867 /*
1868 * Abort, and try again, this time without error.
1869 */
1870 TEST_SAY("Aborting and retrying\n");
1871 is_retry = rd_true;
1872
1873 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 60000));
1874 goto retry;
1875 }
1876
1877 /* All done */
1878
1879 rd_kafka_destroy(rk);
1880
1881 SUB_TEST_PASS();
1882 }
1883
1884
1885 /**
1886 * @brief ESC-4424: rko is reused in response handler after destroy in coord_req
1887 * sender due to bad state.
1888 *
1889 * This is somewhat of a race condition so we need to perform a couple of
1890 * iterations before it hits, usually 2 or 3, so we try at least 15 times.
1891 */
do_test_txn_coord_req_destroy(void)1892 static void do_test_txn_coord_req_destroy (void) {
1893 rd_kafka_t *rk;
1894 rd_kafka_mock_cluster_t *mcluster;
1895 int i;
1896 int errcnt = 0;
1897
1898 SUB_TEST();
1899
1900 rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1901
1902 test_curr->ignore_dr_err = rd_true;
1903
1904 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1905
1906 for (i = 0 ; i < 15 ; i++) {
1907 rd_kafka_error_t *error;
1908 rd_kafka_resp_err_t err;
1909 rd_kafka_topic_partition_list_t *offsets;
1910 rd_kafka_consumer_group_metadata_t *cgmetadata;
1911
1912 test_timeout_set(10);
1913
1914 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1915
1916 /*
1917 * Inject errors to trigger retries
1918 */
1919 rd_kafka_mock_push_request_errors(
1920 mcluster,
1921 RD_KAFKAP_AddPartitionsToTxn,
1922 2,/* first request + number of internal retries */
1923 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1924 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1925
1926 rd_kafka_mock_push_request_errors(
1927 mcluster,
1928 RD_KAFKAP_AddOffsetsToTxn,
1929 1,/* first request + number of internal retries */
1930 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1931
1932 err = rd_kafka_producev(rk,
1933 RD_KAFKA_V_TOPIC("mytopic"),
1934 RD_KAFKA_V_VALUE("hi", 2),
1935 RD_KAFKA_V_END);
1936 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1937
1938 rd_kafka_mock_push_request_errors(
1939 mcluster,
1940 RD_KAFKAP_Produce,
1941 4,
1942 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
1943 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
1944 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
1945 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
1946 /* FIXME: When KIP-360 is supported, add this error:
1947 * RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER */
1948
1949 err = rd_kafka_producev(rk,
1950 RD_KAFKA_V_TOPIC("mytopic"),
1951 RD_KAFKA_V_VALUE("hi", 2),
1952 RD_KAFKA_V_END);
1953 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1954
1955
1956 /*
1957 * Send offsets to transaction
1958 */
1959
1960 offsets = rd_kafka_topic_partition_list_new(1);
1961 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->
1962 offset = 12;
1963
1964 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1965
1966 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1967 cgmetadata, -1);
1968
1969 TEST_SAY("send_offsets_to_transaction() #%d: %s\n",
1970 i, rd_kafka_error_string(error));
1971
1972 /* As we can't control the exact timing and sequence
1973 * of requests this sometimes fails and sometimes succeeds,
1974 * but we run the test enough times to trigger at least
1975 * one failure. */
1976 if (error) {
1977 TEST_SAY("send_offsets_to_transaction() #%d "
1978 "failed (expectedly): %s\n",
1979 i, rd_kafka_error_string(error));
1980 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
1981 "Expected abortable error for #%d", i);
1982 rd_kafka_error_destroy(error);
1983 errcnt++;
1984 }
1985
1986 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1987 rd_kafka_topic_partition_list_destroy(offsets);
1988
1989 /* Allow time for internal retries */
1990 rd_sleep(2);
1991
1992 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
1993 }
1994
1995 TEST_ASSERT(errcnt > 0,
1996 "Expected at least one send_offets_to_transaction() "
1997 "failure");
1998
1999 /* All done */
2000
2001 rd_kafka_destroy(rk);
2002 }
2003
2004
2005 static rd_atomic32_t multi_find_req_cnt;
2006
2007 static rd_kafka_resp_err_t
multi_find_on_response_received_cb(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)2008 multi_find_on_response_received_cb (rd_kafka_t *rk,
2009 int sockfd,
2010 const char *brokername,
2011 int32_t brokerid,
2012 int16_t ApiKey,
2013 int16_t ApiVersion,
2014 int32_t CorrId,
2015 size_t size,
2016 int64_t rtt,
2017 rd_kafka_resp_err_t err,
2018 void *ic_opaque) {
2019 rd_kafka_mock_cluster_t *mcluster = rd_kafka_handle_mock_cluster(rk);
2020 rd_bool_t done = rd_atomic32_get(&multi_find_req_cnt) > 10000;
2021
2022 if (ApiKey != RD_KAFKAP_AddOffsetsToTxn || done)
2023 return RD_KAFKA_RESP_ERR_NO_ERROR;
2024
2025 TEST_SAY("on_response_received_cb: %s: %s: brokerid %"PRId32
2026 ", ApiKey %hd, CorrId %d, rtt %.2fms, %s: %s\n",
2027 rd_kafka_name(rk), brokername, brokerid, ApiKey, CorrId,
2028 rtt != -1 ? (float)rtt / 1000.0 : 0.0,
2029 done ? "already done" : "not done yet",
2030 rd_kafka_err2name(err));
2031
2032
2033 if (rd_atomic32_add(&multi_find_req_cnt, 1) == 1) {
2034 /* Trigger a broker down/up event, which in turns
2035 * triggers the coord_req_fsm(). */
2036 rd_kafka_mock_broker_set_down(mcluster, 2);
2037 rd_kafka_mock_broker_set_up(mcluster, 2);
2038 return RD_KAFKA_RESP_ERR_NO_ERROR;
2039 }
2040
2041 /* Trigger a broker down/up event, which in turns
2042 * triggers the coord_req_fsm(). */
2043 rd_kafka_mock_broker_set_down(mcluster, 3);
2044 rd_kafka_mock_broker_set_up(mcluster, 3);
2045
2046 /* Clear the downed broker's latency so that it reconnects
2047 * quickly, otherwise the ApiVersionRequest will be delayed and
2048 * this will in turn delay the -> UP transition that we need to
2049 * trigger the coord_reqs. */
2050 rd_kafka_mock_broker_set_rtt(mcluster, 3, 0);
2051
2052 /* Only do this down/up once */
2053 rd_atomic32_add(&multi_find_req_cnt, 10000);
2054
2055 return RD_KAFKA_RESP_ERR_NO_ERROR;
2056 }
2057
2058
2059 /**
2060 * @brief ESC-4444: multiple FindCoordinatorRequests are sent referencing
2061 * the same coord_req_t, but the first one received will destroy
2062 * the coord_req_t object and make the subsequent FindCoordingResponses
2063 * reference a freed object.
2064 *
2065 * What we want to achieve is this sequence:
2066 * 1. AddOffsetsToTxnRequest + Response which..
2067 * 2. Triggers TxnOffsetCommitRequest, but the coordinator is not known, so..
2068 * 3. Triggers a FindCoordinatorRequest
2069 * 4. FindCoordinatorResponse from 3 is received ..
2070 * 5. A TxnOffsetCommitRequest is sent from coord_req_fsm().
2071 * 6. Another broker changing state to Up triggers coord reqs again, which..
2072 * 7. Triggers a second TxnOffsetCommitRequest from coord_req_fsm().
2073 * 7. FindCoordinatorResponse from 5 is received, references the destroyed rko
2074 * and crashes.
2075 */
do_test_txn_coord_req_multi_find(void)2076 static void do_test_txn_coord_req_multi_find (void) {
2077 rd_kafka_t *rk;
2078 rd_kafka_mock_cluster_t *mcluster;
2079 rd_kafka_error_t *error;
2080 rd_kafka_resp_err_t err;
2081 rd_kafka_topic_partition_list_t *offsets;
2082 rd_kafka_consumer_group_metadata_t *cgmetadata;
2083 const char *txnid = "txnid", *groupid = "mygroupid", *topic = "mytopic";
2084 int i;
2085
2086 SUB_TEST();
2087
2088 rd_atomic32_init(&multi_find_req_cnt, 0);
2089
2090 on_response_received_cb = multi_find_on_response_received_cb;
2091 rk = create_txn_producer(&mcluster, txnid, 3,
2092 /* Need connections to all brokers so we
2093 * can trigger coord_req_fsm events
2094 * by toggling connections. */
2095 "enable.sparse.connections", "false",
2096 /* Set up on_response_received interceptor */
2097 "on_response_received", "", NULL);
2098
2099 /* Let broker 1 be both txn and group coordinator
2100 * so that the group coordinator connection is up when it is time
2101 * send the TxnOffsetCommitRequest. */
2102 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
2103 rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
2104
2105 /* Set broker 1, 2, and 3 as leaders for a partition each and
2106 * later produce to both partitions so we know there's a connection
2107 * to all brokers. */
2108 rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
2109 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
2110 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2);
2111 rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 3);
2112
2113 /* Broker down is not a test-failing error */
2114 allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
2115 test_curr->is_fatal_cb = error_is_fatal_cb;
2116
2117 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
2118
2119 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2120
2121 for (i = 0 ; i < 3 ; i++) {
2122 err = rd_kafka_producev(rk,
2123 RD_KAFKA_V_TOPIC(topic),
2124 RD_KAFKA_V_PARTITION(i),
2125 RD_KAFKA_V_VALUE("hi", 2),
2126 RD_KAFKA_V_END);
2127 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
2128 }
2129
2130 test_flush(rk, 5000);
2131
2132 /*
2133 * send_offsets_to_transaction() will query for the group coordinator,
2134 * we need to make those requests slow so that multiple requests are
2135 * sent.
2136 */
2137 for (i = 1 ; i <= 3 ; i++)
2138 rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 4000);
2139
2140 /*
2141 * Send offsets to transaction
2142 */
2143
2144 offsets = rd_kafka_topic_partition_list_new(1);
2145 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->
2146 offset = 12;
2147
2148 cgmetadata = rd_kafka_consumer_group_metadata_new(groupid);
2149
2150 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
2151 cgmetadata, -1);
2152
2153 TEST_SAY("send_offsets_to_transaction() %s\n",
2154 rd_kafka_error_string(error));
2155 TEST_ASSERT(!error, "send_offsets_to_transaction() failed: %s",
2156 rd_kafka_error_string(error));
2157
2158 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
2159 rd_kafka_topic_partition_list_destroy(offsets);
2160
2161 /* Clear delay */
2162 for (i = 1 ; i <= 3 ; i++)
2163 rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 0);
2164
2165 rd_sleep(5);
2166
2167 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
2168
2169 /* All done */
2170
2171 TEST_ASSERT(rd_atomic32_get(&multi_find_req_cnt) > 10000,
2172 "on_request_sent interceptor did not trigger properly");
2173
2174 rd_kafka_destroy(rk);
2175
2176 on_response_received_cb = NULL;
2177
2178 SUB_TEST_PASS();
2179 }
2180
2181
2182 /**
2183 * @brief ESC-4410: adding producer partitions gradually will trigger multiple
2184 * AddPartitionsToTxn requests. Due to a bug the third partition to be
2185 * registered would hang in PEND_TXN state.
2186 *
2187 * Trigger this behaviour by having two outstanding AddPartitionsToTxn requests
2188 * at the same time, followed by a need for a third:
2189 *
2190 * 1. Set coordinator broker rtt high (to give us time to produce).
2191 * 2. Produce to partition 0, will trigger first AddPartitionsToTxn.
2192 * 3. Produce to partition 1, will trigger second AddPartitionsToTxn.
2193 * 4. Wait for second AddPartitionsToTxn response.
2194 * 5. Produce to partition 2, should trigger AddPartitionsToTxn, but bug
2195 * causes it to be stale in pending state.
2196 */
2197
2198 static rd_atomic32_t multi_addparts_resp_cnt;
2199 static rd_kafka_resp_err_t
multi_addparts_response_received_cb(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)2200 multi_addparts_response_received_cb (rd_kafka_t *rk,
2201 int sockfd,
2202 const char *brokername,
2203 int32_t brokerid,
2204 int16_t ApiKey,
2205 int16_t ApiVersion,
2206 int32_t CorrId,
2207 size_t size,
2208 int64_t rtt,
2209 rd_kafka_resp_err_t err,
2210 void *ic_opaque) {
2211
2212 if (ApiKey == RD_KAFKAP_AddPartitionsToTxn) {
2213 TEST_SAY("on_response_received_cb: %s: %s: brokerid %"PRId32
2214 ", ApiKey %hd, CorrId %d, rtt %.2fms, count %"PRId32
2215 ": %s\n",
2216 rd_kafka_name(rk), brokername, brokerid,
2217 ApiKey, CorrId,
2218 rtt != -1 ? (float)rtt / 1000.0 : 0.0,
2219 rd_atomic32_get(&multi_addparts_resp_cnt),
2220 rd_kafka_err2name(err));
2221
2222 rd_atomic32_add(&multi_addparts_resp_cnt, 1);
2223 }
2224
2225 return RD_KAFKA_RESP_ERR_NO_ERROR;
2226 }
2227
2228
do_test_txn_addparts_req_multi(void)2229 static void do_test_txn_addparts_req_multi (void) {
2230 rd_kafka_t *rk;
2231 rd_kafka_mock_cluster_t *mcluster;
2232 const char *txnid = "txnid", *topic = "mytopic";
2233 int32_t txn_coord = 2;
2234
2235 SUB_TEST();
2236
2237 rd_atomic32_init(&multi_addparts_resp_cnt, 0);
2238
2239 on_response_received_cb = multi_addparts_response_received_cb;
2240 rk = create_txn_producer(&mcluster, txnid, 3,
2241 "linger.ms", "0",
2242 "message.timeout.ms", "9000",
2243 /* Set up on_response_received interceptor */
2244 "on_response_received", "", NULL);
2245
2246 /* Let broker 1 be txn coordinator. */
2247 rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
2248 txn_coord);
2249
2250 rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
2251
2252 /* Set partition leaders to non-txn-coord broker so they wont
2253 * be affected by rtt delay */
2254 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
2255 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
2256 rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 1);
2257
2258
2259
2260 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
2261
2262 /*
2263 * Run one transaction first to let the client familiarize with
2264 * the topic, this avoids metadata lookups, etc, when the real
2265 * test is run.
2266 */
2267 TEST_SAY("Running seed transaction\n");
2268 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2269 TEST_CALL_ERR__(rd_kafka_producev(rk,
2270 RD_KAFKA_V_TOPIC(topic),
2271 RD_KAFKA_V_VALUE("seed", 4),
2272 RD_KAFKA_V_END));
2273 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
2274
2275
2276 /*
2277 * Now perform test transaction with rtt delays
2278 */
2279 TEST_SAY("Running test transaction\n");
2280
2281 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2282
2283 /* Reset counter */
2284 rd_atomic32_set(&multi_addparts_resp_cnt, 0);
2285
2286 /* Add latency to txn coordinator so we can pace our produce() calls */
2287 rd_kafka_mock_broker_set_rtt(mcluster, txn_coord, 1000);
2288
2289 /* Produce to partition 0 */
2290 TEST_CALL_ERR__(rd_kafka_producev(rk,
2291 RD_KAFKA_V_TOPIC(topic),
2292 RD_KAFKA_V_PARTITION(0),
2293 RD_KAFKA_V_VALUE("hi", 2),
2294 RD_KAFKA_V_END));
2295
2296 rd_usleep(500*1000, NULL);
2297
2298 /* Produce to partition 1 */
2299 TEST_CALL_ERR__(rd_kafka_producev(rk,
2300 RD_KAFKA_V_TOPIC(topic),
2301 RD_KAFKA_V_PARTITION(1),
2302 RD_KAFKA_V_VALUE("hi", 2),
2303 RD_KAFKA_V_END));
2304
2305 TEST_SAY("Waiting for two AddPartitionsToTxnResponse\n");
2306 while (rd_atomic32_get(&multi_addparts_resp_cnt) < 2)
2307 rd_usleep(10*1000, NULL);
2308
2309 TEST_SAY("%"PRId32" AddPartitionsToTxnResponses seen\n",
2310 rd_atomic32_get(&multi_addparts_resp_cnt));
2311
2312 /* Produce to partition 2, this message will hang in
2313 * queue if the bug is not fixed. */
2314 TEST_CALL_ERR__(rd_kafka_producev(rk,
2315 RD_KAFKA_V_TOPIC(topic),
2316 RD_KAFKA_V_PARTITION(2),
2317 RD_KAFKA_V_VALUE("hi", 2),
2318 RD_KAFKA_V_END));
2319
2320 /* Allow some extra time for things to settle before committing
2321 * transaction. */
2322 rd_usleep(1000*1000, NULL);
2323
2324 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 10*1000));
2325
2326 /* All done */
2327 rd_kafka_destroy(rk);
2328
2329 on_response_received_cb = NULL;
2330
2331 SUB_TEST_PASS();
2332 }
2333
2334
2335
2336 /**
2337 * @brief Test handling of OffsetFetchRequest returning UNSTABLE_OFFSET_COMMIT.
2338 *
2339 * There are two things to test;
2340 * - OffsetFetch triggered by committed() (and similar code paths)
2341 * - OffsetFetch triggered by assign()
2342 */
do_test_unstable_offset_commit(void)2343 static void do_test_unstable_offset_commit (void) {
2344 rd_kafka_t *rk, *c;
2345 rd_kafka_conf_t *c_conf;
2346 rd_kafka_mock_cluster_t *mcluster;
2347 rd_kafka_topic_partition_list_t *offsets;
2348 const char *topic = "mytopic";
2349 const int msgcnt = 100;
2350 const int64_t offset_to_commit = msgcnt / 2;
2351 int i;
2352 int remains = 0;
2353
2354 SUB_TEST_QUICK();
2355
2356 rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
2357
2358 test_conf_init(&c_conf, NULL, 0);
2359 test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
2360 test_conf_set(c_conf, "bootstrap.servers",
2361 rd_kafka_mock_cluster_bootstraps(mcluster));
2362 test_conf_set(c_conf, "enable.partition.eof", "true");
2363 test_conf_set(c_conf, "auto.offset.reset", "error");
2364 c = test_create_consumer("mygroup", NULL, c_conf, NULL);
2365
2366 rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
2367
2368 /* Produce some messages to the topic so that the consumer has
2369 * something to read. */
2370 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
2371 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2372 test_produce_msgs2_nowait(rk, topic, 0, 0, 0, msgcnt,
2373 NULL, 0, &remains);
2374 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
2375
2376
2377 /* Commit offset */
2378 offsets = rd_kafka_topic_partition_list_new(1);
2379 rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
2380 offset_to_commit;
2381 TEST_CALL_ERR__(rd_kafka_commit(c, offsets, 0/*sync*/));
2382 rd_kafka_topic_partition_list_destroy(offsets);
2383
2384 /* Retrieve offsets by calling committed().
2385 *
2386 * Have OffsetFetch fail and retry, on the first iteration
2387 * the API timeout is higher than the amount of time the retries will
2388 * take and thus succeed, and on the second iteration the timeout
2389 * will be lower and thus fail. */
2390 for (i = 0 ; i < 2 ; i++) {
2391 rd_kafka_resp_err_t err;
2392 rd_kafka_resp_err_t exp_err = i == 0 ?
2393 RD_KAFKA_RESP_ERR_NO_ERROR :
2394 RD_KAFKA_RESP_ERR__TIMED_OUT;
2395 int timeout_ms = exp_err ? 200 : 5*1000;
2396
2397 rd_kafka_mock_push_request_errors(
2398 mcluster,
2399 RD_KAFKAP_OffsetFetch,
2400 1+5,/* first request + some retries */
2401 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2402 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2403 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2404 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2405 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2406 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
2407
2408 offsets = rd_kafka_topic_partition_list_new(1);
2409 rd_kafka_topic_partition_list_add(offsets, topic, 0);
2410
2411 err = rd_kafka_committed(c, offsets, timeout_ms);
2412
2413 TEST_SAY("#%d: committed() returned %s (expected %s)\n",
2414 i,
2415 rd_kafka_err2name(err),
2416 rd_kafka_err2name(exp_err));
2417
2418 TEST_ASSERT(err == exp_err,
2419 "#%d: Expected committed() to return %s, not %s",
2420 i,
2421 rd_kafka_err2name(exp_err),
2422 rd_kafka_err2name(err));
2423 TEST_ASSERT(offsets->cnt == 1,
2424 "Expected 1 committed offset, not %d",
2425 offsets->cnt);
2426 if (!exp_err)
2427 TEST_ASSERT(offsets->elems[0].offset == offset_to_commit,
2428 "Expected committed offset %"PRId64", "
2429 "not %"PRId64,
2430 offset_to_commit,
2431 offsets->elems[0].offset);
2432 else
2433 TEST_ASSERT(offsets->elems[0].offset < 0,
2434 "Expected no committed offset, "
2435 "not %"PRId64,
2436 offsets->elems[0].offset);
2437
2438 rd_kafka_topic_partition_list_destroy(offsets);
2439 }
2440
2441 TEST_SAY("Phase 2: OffsetFetch lookup through assignment\n");
2442 offsets = rd_kafka_topic_partition_list_new(1);
2443 rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
2444 RD_KAFKA_OFFSET_STORED;
2445
2446 rd_kafka_mock_push_request_errors(
2447 mcluster,
2448 RD_KAFKAP_OffsetFetch,
2449 1+5,/* first request + some retries */
2450 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2451 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2452 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2453 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2454 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2455 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
2456
2457 test_consumer_incremental_assign("assign", c, offsets);
2458 rd_kafka_topic_partition_list_destroy(offsets);
2459
2460 test_consumer_poll_exact("consume", c, 0,
2461 1/*eof*/, 0, msgcnt/2,
2462 rd_true/*exact counts*/, NULL);
2463
2464 /* All done */
2465 rd_kafka_destroy(c);
2466 rd_kafka_destroy(rk);
2467
2468 SUB_TEST_PASS();
2469 }
2470
2471
2472 /**
2473 * @brief If a message times out locally before being attempted to send
2474 * and commit_transaction() is called, the transaction must not succeed.
2475 * https://github.com/confluentinc/confluent-kafka-dotnet/issues/1568
2476 */
do_test_commit_after_msg_timeout(void)2477 static void do_test_commit_after_msg_timeout (void) {
2478 rd_kafka_t *rk;
2479 rd_kafka_mock_cluster_t *mcluster;
2480 int32_t coord_id, leader_id;
2481 rd_kafka_resp_err_t err;
2482 rd_kafka_error_t *error;
2483 const char *topic = "test";
2484 const char *transactional_id = "txnid";
2485 int remains = 0;
2486
2487 SUB_TEST_QUICK();
2488
2489 /* Assign coordinator and leader to two different brokers */
2490 coord_id = 1;
2491 leader_id = 2;
2492
2493 rk = create_txn_producer(&mcluster, transactional_id, 3,
2494 "message.timeout.ms", "5000",
2495 "transaction.timeout.ms", "10000",
2496 NULL);
2497
2498 /* Broker down is not a test-failing error */
2499 allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
2500 test_curr->is_fatal_cb = error_is_fatal_cb;
2501 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
2502
2503 err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
2504 TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str (err));
2505
2506 rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
2507 coord_id);
2508 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
2509
2510 /* Start transactioning */
2511 TEST_SAY("Starting transaction\n");
2512 TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
2513
2514 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2515
2516 TEST_SAY("Bringing down %"PRId32"\n", leader_id);
2517 rd_kafka_mock_broker_set_down(mcluster, leader_id);
2518 rd_kafka_mock_broker_set_down(mcluster, coord_id);
2519
2520 test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
2521
2522 error = rd_kafka_commit_transaction(rk, -1);
2523 TEST_ASSERT(error != NULL, "expected commit_transaciton() to fail");
2524 TEST_SAY("commit_transaction() failed (as expected): %s\n",
2525 rd_kafka_error_string(error));
2526 TEST_ASSERT(rd_kafka_error_txn_requires_abort (error),
2527 "Expected txn_requires_abort error");
2528 rd_kafka_error_destroy(error);
2529
2530 /* Bring the brokers up so the abort can complete */
2531 rd_kafka_mock_broker_set_up(mcluster, coord_id);
2532 rd_kafka_mock_broker_set_up(mcluster, leader_id);
2533
2534 TEST_SAY("Aborting transaction\n");
2535 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
2536
2537 TEST_ASSERT(remains == 0,
2538 "%d message(s) were not flushed\n", remains);
2539
2540 TEST_SAY("Attempting second transaction, which should succeed\n");
2541 allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
2542 test_curr->is_fatal_cb = error_is_fatal_cb;
2543 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
2544
2545 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2546 test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
2547
2548 TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
2549
2550 TEST_ASSERT(remains == 0,
2551 "%d message(s) were not produced\n", remains);
2552
2553 rd_kafka_destroy(rk);
2554
2555 test_curr->is_fatal_cb = NULL;
2556
2557 SUB_TEST_PASS();
2558 }
2559
main_0105_transactions_mock(int argc,char ** argv)2560 int main_0105_transactions_mock (int argc, char **argv) {
2561 if (test_needs_auth()) {
2562 TEST_SKIP("Mock cluster does not support SSL/SASL\n");
2563 return 0;
2564 }
2565
2566 do_test_txn_recoverable_errors();
2567
2568 do_test_txn_fatal_idempo_errors();
2569
2570 do_test_txn_fenced_reinit();
2571
2572 do_test_txn_req_cnt();
2573
2574 do_test_txn_requires_abort_errors();
2575
2576 do_test_txn_slow_reinit(rd_false);
2577 do_test_txn_slow_reinit(rd_true);
2578
2579 /* Just do a subset of tests in quick mode */
2580 if (test_quick)
2581 return 0;
2582
2583 do_test_txn_endtxn_errors();
2584
2585 do_test_txn_endtxn_infinite();
2586
2587 /* Skip tests for non-infinite commit/abort timeouts
2588 * until they're properly handled by the producer. */
2589 if (0)
2590 do_test_txn_endtxn_timeout();
2591
2592 /* Bring down the coordinator */
2593 do_test_txn_broker_down_in_txn(rd_true);
2594
2595 /* Bring down partition leader */
2596 do_test_txn_broker_down_in_txn(rd_false);
2597
2598 do_test_txns_not_supported();
2599
2600 do_test_txns_send_offsets_concurrent_is_retried();
2601
2602 do_test_txn_coord_req_destroy();
2603
2604 do_test_txn_coord_req_multi_find();
2605
2606 do_test_txn_addparts_req_multi();
2607
2608 do_test_txns_no_timeout_crash();
2609
2610 do_test_txn_auth_failure(
2611 RD_KAFKAP_InitProducerId,
2612 RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
2613
2614 do_test_txn_auth_failure(
2615 RD_KAFKAP_FindCoordinator,
2616 RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
2617
2618 do_test_txn_flush_timeout();
2619
2620 do_test_unstable_offset_commit();
2621
2622 do_test_commit_after_msg_timeout();
2623
2624 do_test_txn_switch_coordinator();
2625
2626 return 0;
2627 }
2628