1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2019, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 #include "rdkafka.h"
32 
33 #include "../src/rdkafka_proto.h"
34 #include "../src/rdstring.h"
35 #include "../src/rdunittest.h"
36 
37 #include <stdarg.h>
38 
39 
40 /**
41  * @name Producer transaction tests using the mock cluster
42  *
43  */
44 
45 
46 static int allowed_error;
47 
48 /**
49  * @brief Decide what error_cb's will cause the test to fail.
50  */
error_is_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)51 static int error_is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
52                               const char *reason) {
53         if (err == allowed_error ||
54             /* If transport errors are allowed then it is likely
55              * that we'll also see ALL_BROKERS_DOWN. */
56             (allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT &&
57              err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) {
58                 TEST_SAY("Ignoring allowed error: %s: %s\n",
59                          rd_kafka_err2name(err), reason);
60                 return 0;
61         }
62         return 1;
63 }
64 
65 
66 static rd_kafka_resp_err_t (*on_response_received_cb) (rd_kafka_t *rk,
67                                                        int sockfd,
68                                                        const char *brokername,
69                                                        int32_t brokerid,
70                                                        int16_t ApiKey,
71                                                        int16_t ApiVersion,
72                                                        int32_t CorrId,
73                                                        size_t  size,
74                                                        int64_t rtt,
75                                                        rd_kafka_resp_err_t err,
76                                                        void *ic_opaque);
77 
78 /**
79  * @brief Simple on_response_received interceptor that simply calls the
80  *        sub-test's on_response_received_cb function, if set.
81  */
82 static rd_kafka_resp_err_t
on_response_received_trampoline(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)83 on_response_received_trampoline (rd_kafka_t *rk,
84                                  int sockfd,
85                                  const char *brokername,
86                                  int32_t brokerid,
87                                  int16_t ApiKey,
88                                  int16_t ApiVersion,
89                                  int32_t CorrId,
90                                  size_t  size,
91                                  int64_t rtt,
92                                  rd_kafka_resp_err_t err,
93                                  void *ic_opaque) {
94         TEST_ASSERT(on_response_received_cb != NULL, "");
95         return on_response_received_cb(rk, sockfd, brokername, brokerid,
96                                        ApiKey, ApiVersion,
97                                        CorrId, size, rtt, err, ic_opaque);
98 }
99 
100 
101 /**
102  * @brief on_new interceptor to add an on_response_received interceptor.
103  */
on_new_producer(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)104 static rd_kafka_resp_err_t on_new_producer (rd_kafka_t *rk,
105                                             const rd_kafka_conf_t *conf,
106                                             void *ic_opaque,
107                                             char *errstr, size_t errstr_size) {
108         rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
109 
110         if (on_response_received_cb)
111                 err = rd_kafka_interceptor_add_on_response_received(
112                         rk, "on_response_received",
113                         on_response_received_trampoline, ic_opaque);
114 
115         return err;
116 }
117 
118 
119 /**
120  * @brief Create a transactional producer and a mock cluster.
121  *
122  * The var-arg list is a NULL-terminated list of
123  * (const char *key, const char *value) config properties.
124  *
125  * Special keys:
126  *   "on_response_received", "" - enable the on_response_received_cb
127  *                                interceptor,
128  *                                which must be assigned prior to
129  *                                calling create_tnx_producer().
130  */
create_txn_producer(rd_kafka_mock_cluster_t ** mclusterp,const char * transactional_id,int broker_cnt,...)131 static rd_kafka_t *create_txn_producer (rd_kafka_mock_cluster_t **mclusterp,
132                                         const char *transactional_id,
133                                         int broker_cnt, ...) {
134         rd_kafka_conf_t *conf;
135         rd_kafka_t *rk;
136         char numstr[8];
137         va_list ap;
138         const char *key;
139         rd_bool_t add_interceptors = rd_false;
140 
141         rd_snprintf(numstr, sizeof(numstr), "%d", broker_cnt);
142 
143         test_conf_init(&conf, NULL, 60);
144 
145         test_conf_set(conf, "transactional.id", transactional_id);
146         /* Speed up reconnects */
147         test_conf_set(conf, "reconnect.backoff.max.ms", "2000");
148         test_conf_set(conf, "test.mock.num.brokers", numstr);
149         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
150 
151         test_curr->ignore_dr_err = rd_false;
152 
153         va_start(ap, broker_cnt);
154         while ((key = va_arg(ap, const char *))) {
155                 if (!strcmp(key, "on_response_received")) {
156                         add_interceptors = rd_true;
157                         (void)va_arg(ap, const char *);
158                 } else {
159                         test_conf_set(conf, key, va_arg(ap, const char *));
160                 }
161         }
162         va_end(ap);
163 
164         /* Add an on_.. interceptors */
165         if (add_interceptors)
166                 rd_kafka_conf_interceptor_add_on_new(
167                         conf,
168                         "on_new_producer",
169                         on_new_producer, NULL);
170 
171         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
172 
173         if (mclusterp) {
174                 *mclusterp = rd_kafka_handle_mock_cluster(rk);
175                 TEST_ASSERT(*mclusterp, "failed to create mock cluster");
176         }
177 
178         return rk;
179 }
180 
181 
182 /**
183  * @brief Test recoverable errors using mock broker error injections
184  *        and code coverage checks.
185  */
do_test_txn_recoverable_errors(void)186 static void do_test_txn_recoverable_errors (void) {
187         rd_kafka_t *rk;
188         rd_kafka_mock_cluster_t *mcluster;
189         rd_kafka_topic_partition_list_t *offsets;
190         rd_kafka_consumer_group_metadata_t *cgmetadata;
191         const char *groupid = "myGroupId";
192         const char *txnid = "myTxnId";
193 
194         SUB_TEST_QUICK();
195 
196         rk = create_txn_producer(&mcluster, txnid, 3,
197                                  "batch.num.messages", "1",
198                                  NULL);
199 
200         /* Make sure transaction and group coordinators are different.
201          * This verifies that AddOffsetsToTxnRequest isn't sent to the
202          * transaction coordinator but the group coordinator. */
203         rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
204         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 2);
205 
206         /*
207          * Inject som InitProducerId errors that causes retries
208          */
209         rd_kafka_mock_push_request_errors(
210                 mcluster,
211                 RD_KAFKAP_InitProducerId,
212                 3,
213                 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
214                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
215                 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
216 
217         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
218 
219         (void)RD_UT_COVERAGE_CHECK(0); /* idemp_request_pid_failed(retry) */
220         (void)RD_UT_COVERAGE_CHECK(1); /* txn_idemp_state_change(READY) */
221 
222         /*
223          * Start a transaction
224          */
225         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
226 
227 
228         /* Produce a message without error first */
229         TEST_CALL_ERR__(rd_kafka_producev(rk,
230                                           RD_KAFKA_V_TOPIC("mytopic"),
231                                           RD_KAFKA_V_PARTITION(0),
232                                           RD_KAFKA_V_VALUE("hi", 2),
233                                           RD_KAFKA_V_END));
234 
235         /*
236          * Produce a message, let it fail with a non-idempo/non-txn
237          * retryable error
238          */
239         rd_kafka_mock_push_request_errors(
240                 mcluster,
241                 RD_KAFKAP_Produce,
242                 1,
243                 RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS);
244 
245         TEST_CALL_ERR__(rd_kafka_producev(rk,
246                                           RD_KAFKA_V_TOPIC("mytopic"),
247                                           RD_KAFKA_V_PARTITION(0),
248                                           RD_KAFKA_V_VALUE("hi", 2),
249                                           RD_KAFKA_V_END));
250 
251         /* Make sure messages are produced */
252         rd_kafka_flush(rk, -1);
253 
254         /*
255          * Send some arbitrary offsets, first with some failures, then
256          * succeed.
257          */
258         offsets = rd_kafka_topic_partition_list_new(4);
259         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
260         rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
261                 999999111;
262         rd_kafka_topic_partition_list_add(offsets, "srctopic", 0)->offset = 999;
263         rd_kafka_topic_partition_list_add(offsets, "srctop2", 3499)->offset =
264                 123456789;
265 
266         rd_kafka_mock_push_request_errors(
267                 mcluster,
268                 RD_KAFKAP_AddPartitionsToTxn,
269                 1,
270                 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
271 
272         rd_kafka_mock_push_request_errors(
273                 mcluster,
274                 RD_KAFKAP_TxnOffsetCommit,
275                 2,
276                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
277                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
278 
279         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
280 
281         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
282                                   rk, offsets,
283                                   cgmetadata, -1));
284 
285         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
286         rd_kafka_topic_partition_list_destroy(offsets);
287 
288         /*
289          * Commit transaction, first with som failures, then succeed.
290          */
291         rd_kafka_mock_push_request_errors(
292                 mcluster,
293                 RD_KAFKAP_EndTxn,
294                 3,
295                 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
296                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
297                 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
298 
299         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
300 
301         /* All done */
302 
303         rd_kafka_destroy(rk);
304 
305         SUB_TEST_PASS();
306 }
307 
308 
309 /**
310  * @brief KIP-360: Test that fatal idempotence errors triggers abortable
311  *        transaction errors and that the producer can recover.
312  */
do_test_txn_fatal_idempo_errors(void)313 static void do_test_txn_fatal_idempo_errors (void) {
314         rd_kafka_t *rk;
315         rd_kafka_mock_cluster_t *mcluster;
316         rd_kafka_error_t *error;
317         const char *txnid = "myTxnId";
318 
319         SUB_TEST_QUICK();
320 
321         rk = create_txn_producer(&mcluster, txnid, 3,
322                                  "batch.num.messages", "1",
323                                  NULL);
324 
325         test_curr->ignore_dr_err = rd_true;
326         test_curr->is_fatal_cb = error_is_fatal_cb;
327         allowed_error = RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID;
328 
329         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
330 
331         /*
332          * Start a transaction
333          */
334         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
335 
336 
337         /* Produce a message without error first */
338         TEST_CALL_ERR__(rd_kafka_producev(rk,
339                                           RD_KAFKA_V_TOPIC("mytopic"),
340                                           RD_KAFKA_V_PARTITION(0),
341                                           RD_KAFKA_V_VALUE("hi", 2),
342                                           RD_KAFKA_V_END));
343 
344         /* Produce a message, let it fail with a fatal idempo error. */
345         rd_kafka_mock_push_request_errors(
346                 mcluster,
347                 RD_KAFKAP_Produce,
348                 1,
349                 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
350 
351         TEST_CALL_ERR__(rd_kafka_producev(rk,
352                                           RD_KAFKA_V_TOPIC("mytopic"),
353                                           RD_KAFKA_V_PARTITION(0),
354                                           RD_KAFKA_V_VALUE("hi", 2),
355                                           RD_KAFKA_V_END));
356 
357         /* Commit the transaction, should fail */
358         error = rd_kafka_commit_transaction(rk, -1);
359         TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
360 
361         TEST_SAY("commit_transaction() failed (expectedly): %s\n",
362                  rd_kafka_error_string(error));
363 
364         TEST_ASSERT(!rd_kafka_error_is_fatal(error),
365                     "Did not expect fatal error");
366         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
367                     "Expected abortable error");
368         rd_kafka_error_destroy(error);
369 
370         /* Abort the transaction */
371         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
372 
373         /* Run a new transaction without errors to verify that the
374          * producer can recover. */
375         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
376 
377         TEST_CALL_ERR__(rd_kafka_producev(rk,
378                                           RD_KAFKA_V_TOPIC("mytopic"),
379                                           RD_KAFKA_V_PARTITION(0),
380                                           RD_KAFKA_V_VALUE("hi", 2),
381                                           RD_KAFKA_V_END));
382 
383         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
384 
385         /* All done */
386 
387         rd_kafka_destroy(rk);
388 
389         allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
390 
391         SUB_TEST_PASS();
392 }
393 
394 
395 /**
396  * @brief KIP-360: Test that fatal idempotence errors triggers abortable
397  *        transaction errors, but let the broker-side bumping of the
398  *        producer PID take longer than the remaining transaction timeout
399  *        which should raise a retriable error from abort_transaction().
400  *
401  * @param with_sleep After the first abort sleep longer than it takes to
402  *                   re-init the pid so that the internal state automatically
403  *                   transitions.
404  */
do_test_txn_slow_reinit(rd_bool_t with_sleep)405 static void do_test_txn_slow_reinit (rd_bool_t with_sleep) {
406         rd_kafka_t *rk;
407         rd_kafka_mock_cluster_t *mcluster;
408         rd_kafka_error_t *error;
409         int32_t txn_coord = 2;
410         const char *txnid = "myTxnId";
411         test_timing_t timing;
412 
413         SUB_TEST("%s sleep", with_sleep ? "with": "without");
414 
415         rk = create_txn_producer(&mcluster, txnid, 3,
416                                  "batch.num.messages", "1",
417                                  NULL);
418 
419         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
420                                       txn_coord);
421 
422         test_curr->ignore_dr_err = rd_true;
423         test_curr->is_fatal_cb = NULL;
424 
425         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
426 
427         /*
428          * Start a transaction
429          */
430         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
431 
432 
433         /* Produce a message without error first */
434         TEST_CALL_ERR__(rd_kafka_producev(rk,
435                                           RD_KAFKA_V_TOPIC("mytopic"),
436                                           RD_KAFKA_V_PARTITION(0),
437                                           RD_KAFKA_V_VALUE("hi", 2),
438                                           RD_KAFKA_V_END));
439 
440         test_flush(rk, -1);
441 
442         /* Set transaction coordinator latency higher than
443          * the abort_transaction() call timeout so that the automatic
444          * re-initpid takes longer than abort_transaction(). */
445         rd_kafka_mock_broker_push_request_error_rtts(
446                 mcluster,
447                 txn_coord,
448                 RD_KAFKAP_InitProducerId,
449                 1,
450                 RD_KAFKA_RESP_ERR_NO_ERROR, 10000/*10s*/);
451 
452         /* Produce a message, let it fail with a fatal idempo error. */
453         rd_kafka_mock_push_request_errors(
454                 mcluster,
455                 RD_KAFKAP_Produce,
456                 1,
457                 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
458 
459         TEST_CALL_ERR__(rd_kafka_producev(rk,
460                                           RD_KAFKA_V_TOPIC("mytopic"),
461                                           RD_KAFKA_V_PARTITION(0),
462                                           RD_KAFKA_V_VALUE("hi", 2),
463                                           RD_KAFKA_V_END));
464 
465 
466         /* Commit the transaction, should fail */
467         TIMING_START(&timing, "commit_transaction(-1)");
468         error = rd_kafka_commit_transaction(rk, -1);
469         TIMING_STOP(&timing);
470         TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
471 
472         TEST_SAY("commit_transaction() failed (expectedly): %s\n",
473                  rd_kafka_error_string(error));
474 
475         TEST_ASSERT(!rd_kafka_error_is_fatal(error),
476                     "Did not expect fatal error");
477         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
478                     "Expected abortable error");
479         rd_kafka_error_destroy(error);
480 
481         /* Abort the transaction, should fail with retriable (timeout) error */
482         TIMING_START(&timing, "abort_transaction(100)");
483         error = rd_kafka_abort_transaction(rk, 100);
484         TIMING_STOP(&timing);
485         TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
486 
487         TEST_SAY("First abort_transaction() failed: %s\n",
488                  rd_kafka_error_string(error));
489         TEST_ASSERT(!rd_kafka_error_is_fatal(error),
490                     "Did not expect fatal error");
491         TEST_ASSERT(rd_kafka_error_is_retriable(error),
492                     "Expected retriable error");
493         rd_kafka_error_destroy(error);
494 
495         if (with_sleep)
496                 rd_sleep(12);
497 
498         /* Retry abort, should now finish. */
499         TEST_SAY("Retrying abort\n");
500         TIMING_START(&timing, "abort_transaction(-1)");
501         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
502         TIMING_STOP(&timing);
503 
504         /* Run a new transaction without errors to verify that the
505          * producer can recover. */
506         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
507 
508         TEST_CALL_ERR__(rd_kafka_producev(rk,
509                                           RD_KAFKA_V_TOPIC("mytopic"),
510                                           RD_KAFKA_V_PARTITION(0),
511                                           RD_KAFKA_V_VALUE("hi", 2),
512                                           RD_KAFKA_V_END));
513 
514         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
515 
516         /* All done */
517 
518         rd_kafka_destroy(rk);
519 
520         allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
521 
522         SUB_TEST_PASS();
523 }
524 
525 
526 
527 /**
528  * @brief KIP-360: Test that fatal idempotence errors triggers abortable
529  *        transaction errors, but let the broker-side bumping of the
530  *        producer PID fail with a fencing error.
531  *        Should raise a fatal error.
532  */
do_test_txn_fenced_reinit(void)533 static void do_test_txn_fenced_reinit (void) {
534         rd_kafka_t *rk;
535         rd_kafka_mock_cluster_t *mcluster;
536         rd_kafka_error_t *error;
537         int32_t txn_coord = 2;
538         const char *txnid = "myTxnId";
539         char errstr[512];
540         rd_kafka_resp_err_t fatal_err;
541 
542         SUB_TEST_QUICK();
543 
544         rk = create_txn_producer(&mcluster, txnid, 3,
545                                  "batch.num.messages", "1",
546                                  NULL);
547 
548         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
549                                       txn_coord);
550 
551         test_curr->ignore_dr_err = rd_true;
552         test_curr->is_fatal_cb = error_is_fatal_cb;
553         allowed_error = RD_KAFKA_RESP_ERR__FENCED;
554 
555         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
556 
557         /*
558          * Start a transaction
559          */
560         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
561 
562 
563         /* Produce a message without error first */
564         TEST_CALL_ERR__(rd_kafka_producev(rk,
565                                           RD_KAFKA_V_TOPIC("mytopic"),
566                                           RD_KAFKA_V_PARTITION(0),
567                                           RD_KAFKA_V_VALUE("hi", 2),
568                                           RD_KAFKA_V_END));
569 
570         test_flush(rk, -1);
571 
572         /* Fail the PID reinit */
573         rd_kafka_mock_broker_push_request_error_rtts(
574                 mcluster,
575                 txn_coord,
576                 RD_KAFKAP_InitProducerId,
577                 1,
578                 RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH, 0);
579 
580         /* Produce a message, let it fail with a fatal idempo error. */
581         rd_kafka_mock_push_request_errors(
582                 mcluster,
583                 RD_KAFKAP_Produce,
584                 1,
585                 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
586 
587         TEST_CALL_ERR__(rd_kafka_producev(rk,
588                                           RD_KAFKA_V_TOPIC("mytopic"),
589                                           RD_KAFKA_V_PARTITION(0),
590                                           RD_KAFKA_V_VALUE("hi", 2),
591                                           RD_KAFKA_V_END));
592 
593         test_flush(rk, -1);
594 
595         /* Abort the transaction, should fail with a fatal error */
596         error = rd_kafka_abort_transaction(rk, -1);
597         TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
598 
599         TEST_SAY("abort_transaction() failed: %s\n",
600                  rd_kafka_error_string(error));
601         TEST_ASSERT(rd_kafka_error_is_fatal(error),
602                     "Expected a fatal error");
603         rd_kafka_error_destroy(error);
604 
605         fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
606         TEST_ASSERT(fatal_err,
607                     "Expected a fatal error to have been raised");
608         TEST_SAY("Fatal error: %s: %s\n",
609                  rd_kafka_err2name(fatal_err), errstr);
610 
611         /* All done */
612 
613         rd_kafka_destroy(rk);
614 
615         allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
616 
617         SUB_TEST_PASS();
618 }
619 
620 
621 /**
622  * @brief Test EndTxn errors.
623  */
do_test_txn_endtxn_errors(void)624 static void do_test_txn_endtxn_errors (void) {
625         rd_kafka_t *rk = NULL;
626         rd_kafka_mock_cluster_t *mcluster = NULL;
627         rd_kafka_resp_err_t err;
628         struct {
629                 size_t error_cnt;
630                 rd_kafka_resp_err_t errors[4];
631                 rd_kafka_resp_err_t exp_err;
632                 rd_bool_t exp_retriable;
633                 rd_bool_t exp_abortable;
634                 rd_bool_t exp_fatal;
635         } scenario[] = {
636                 /* This list of errors is from the EndTxnResponse handler in
637                  * AK clients/.../TransactionManager.java */
638                 { /* #0 */
639                         2,
640                         { RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
641                           RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE },
642                         /* Should auto-recover */
643                         RD_KAFKA_RESP_ERR_NO_ERROR,
644                 },
645                 { /* #1 */
646                         2,
647                         { RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
648                           RD_KAFKA_RESP_ERR_NOT_COORDINATOR },
649                         /* Should auto-recover */
650                         RD_KAFKA_RESP_ERR_NO_ERROR,
651                 },
652                 { /* #2 */
653                         1,
654                         { RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS },
655                         /* Should auto-recover */
656                         RD_KAFKA_RESP_ERR_NO_ERROR,
657                 },
658                 { /* #3 */
659                         3,
660                         { RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
661                           RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
662                           RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS },
663                         /* Should auto-recover */
664                         RD_KAFKA_RESP_ERR_NO_ERROR,
665                 },
666                 { /* #4 */
667                         1,
668                         { RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID },
669                         RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
670                         rd_false /* !retriable */,
671                         rd_true /* abortable */,
672                         rd_false /* !fatal */
673                 },
674                 { /* #5 */
675                         1,
676                         { RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING },
677                         RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
678                         rd_false /* !retriable */,
679                         rd_true /* abortable */,
680                         rd_false /* !fatal */
681                 },
682                 { /* #6 */
683                         1,
684                         { RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH },
685                         /* This error is normalized */
686                         RD_KAFKA_RESP_ERR__FENCED,
687                         rd_false /* !retriable */,
688                         rd_false /* !abortable */,
689                         rd_true /* fatal */
690                 },
691                 { /* #7 */
692                         1,
693                         { RD_KAFKA_RESP_ERR_PRODUCER_FENCED },
694                         /* This error is normalized */
695                         RD_KAFKA_RESP_ERR__FENCED,
696                         rd_false /* !retriable */,
697                         rd_false /* !abortable */,
698                         rd_true /* fatal */
699                 },
700                 { /* #8 */
701                         1,
702                         { RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED },
703                         RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
704                         rd_false /* !retriable */,
705                         rd_false /* !abortable */,
706                         rd_true /* fatal */
707                 },
708                 { /* #9 */
709                         1,
710                         { RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED },
711                         RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
712                         rd_false /* !retriable */,
713                         rd_true /* abortable */,
714                         rd_false /* !fatal */
715                 },
716                 { /* #10 */
717                         /* Any other error should raise a fatal error */
718                         1,
719                         { RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE },
720                         RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
721                         rd_false /* !retriable */,
722                         rd_true /* abortable */,
723                         rd_false /* !fatal */,
724                 },
725                 { 0 },
726         };
727         int i;
728 
729         SUB_TEST_QUICK();
730 
731         for (i = 0 ; scenario[i].error_cnt > 0 ; i++) {
732                 int j;
733                 /* For each scenario, test:
734                  *   commit_transaction()
735                  *   flush() + commit_transaction()
736                  *   abort_transaction()
737                  *   flush() + abort_transaction()
738                  */
739                 for (j = 0 ; j < (2+2) ; j++) {
740                         rd_bool_t commit = j < 2;
741                         rd_bool_t with_flush = j & 1;
742                         const char *commit_str =
743                                 commit ?
744                                 (with_flush ? "commit&flush" : "commit") :
745                                 (with_flush ? "abort&flush" : "abort");
746                         rd_kafka_topic_partition_list_t *offsets;
747                         rd_kafka_consumer_group_metadata_t *cgmetadata;
748                         rd_kafka_error_t *error;
749                         test_timing_t t_call;
750 
751                         TEST_SAY("Testing scenario #%d %s with %"PRIusz
752                                  " injected erorrs, expecting %s\n",
753                                  i, commit_str,
754                                  scenario[i].error_cnt,
755                                  rd_kafka_err2name(scenario[i].exp_err));
756 
757                         if (!rk) {
758                                 const char *txnid = "myTxnId";
759                                 rk = create_txn_producer(&mcluster, txnid,
760                                                          3, NULL);
761                                 TEST_CALL_ERROR__(rd_kafka_init_transactions(
762                                                           rk, 5000));
763                         }
764 
765                         /*
766                          * Start transaction
767                          */
768                         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
769 
770                         /* Transaction aborts will cause DR errors:
771                          * ignore them. */
772                         test_curr->ignore_dr_err = !commit;
773 
774                         /*
775                          * Produce a message.
776                          */
777                         err = rd_kafka_producev(rk,
778                                                 RD_KAFKA_V_TOPIC("mytopic"),
779                                                 RD_KAFKA_V_VALUE("hi", 2),
780                                                 RD_KAFKA_V_END);
781                         TEST_ASSERT(!err, "produce failed: %s",
782                                     rd_kafka_err2str(err));
783 
784                         if (with_flush)
785                                 test_flush(rk, -1);
786 
787                         /*
788                          * Send some arbitrary offsets.
789                          */
790                         offsets = rd_kafka_topic_partition_list_new(4);
791                         rd_kafka_topic_partition_list_add(offsets, "srctopic",
792                                                           3)->offset = 12;
793                         rd_kafka_topic_partition_list_add(offsets, "srctop2",
794                                                           99)->offset = 99999;
795 
796                         cgmetadata = rd_kafka_consumer_group_metadata_new(
797                                 "mygroupid");
798 
799                         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
800                                                   rk, offsets,
801                                                   cgmetadata, -1));
802 
803                         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
804                         rd_kafka_topic_partition_list_destroy(offsets);
805 
806                         /*
807                          * Commit transaction, first with som failures,
808                          * then succeed.
809                          */
810                         rd_kafka_mock_push_request_errors_array(
811                                 mcluster,
812                                 RD_KAFKAP_EndTxn,
813                                 scenario[i].error_cnt,
814                                 scenario[i].errors);
815 
816                         TIMING_START(&t_call, "%s", commit_str);
817                         if (commit)
818                                 error = rd_kafka_commit_transaction(
819                                         rk, tmout_multip(5000));
820                         else
821                                 error = rd_kafka_abort_transaction(
822                                         rk, tmout_multip(5000));
823                         TIMING_STOP(&t_call);
824 
825                         if (error)
826                                 TEST_SAY("Scenario #%d %s failed: %s: %s "
827                                          "(retriable=%s, req_abort=%s, "
828                                          "fatal=%s)\n",
829                                          i, commit_str,
830                                          rd_kafka_error_name(error),
831                                          rd_kafka_error_string(error),
832                                          RD_STR_ToF(rd_kafka_error_is_retriable(error)),
833                                          RD_STR_ToF(rd_kafka_error_txn_requires_abort(error)),
834                                          RD_STR_ToF(rd_kafka_error_is_fatal(error)));
835                         else
836                                 TEST_SAY("Scenario #%d %s succeeded\n",
837                                          i, commit_str);
838 
839                         if (!scenario[i].exp_err) {
840                                 TEST_ASSERT(!error,
841                                             "Expected #%d %s to succeed, "
842                                             "got %s",
843                                             i, commit_str,
844                                             rd_kafka_error_string(error));
845                                 continue;
846                         }
847 
848 
849                         TEST_ASSERT(error != NULL,
850                                     "Expected #%d %s to fail",
851                                     i, commit_str);
852                         TEST_ASSERT(scenario[i].exp_err ==
853                                     rd_kafka_error_code(error),
854                                     "Scenario #%d: expected %s, not %s",
855                                     i,
856                                     rd_kafka_err2name(scenario[i].exp_err),
857                                     rd_kafka_error_name(error));
858                         TEST_ASSERT(scenario[i].exp_retriable ==
859                                     (rd_bool_t)
860                                     rd_kafka_error_is_retriable(error),
861                                     "Scenario #%d: retriable mismatch",
862                                     i);
863                         TEST_ASSERT(scenario[i].exp_abortable ==
864                                     (rd_bool_t)
865                                     rd_kafka_error_txn_requires_abort(error),
866                                     "Scenario #%d: abortable mismatch",
867                                     i);
868                         TEST_ASSERT(scenario[i].exp_fatal ==
869                                     (rd_bool_t)rd_kafka_error_is_fatal(error),
870                                     "Scenario #%d: fatal mismatch", i);
871 
872                         /* Handle errors according to the error flags */
873                         if (rd_kafka_error_is_fatal(error)) {
874                                 TEST_SAY("Fatal error, destroying producer\n");
875                                 rd_kafka_error_destroy(error);
876                                 rd_kafka_destroy(rk);
877                                 rk = NULL; /* Will be re-created on the next
878                                             * loop iteration. */
879 
880                         } else if (rd_kafka_error_txn_requires_abort(error)) {
881                                 rd_kafka_error_destroy(error);
882                                 TEST_SAY("Abortable error, "
883                                          "aborting transaction\n");
884                                 TEST_CALL_ERROR__(
885                                         rd_kafka_abort_transaction(rk, -1));
886 
887                         } else if (rd_kafka_error_is_retriable(error)) {
888                                 rd_kafka_error_destroy(error);
889                                 TEST_SAY("Retriable error, retrying %s once\n",
890                                          commit_str);
891                                 if (commit)
892                                         TEST_CALL_ERROR__(
893                                                 rd_kafka_commit_transaction(
894                                                         rk, 5000));
895                                 else
896                                         TEST_CALL_ERROR__(
897                                                 rd_kafka_abort_transaction(
898                                                         rk, 5000));
899                         } else {
900                                 TEST_FAIL("Scenario #%d %s: "
901                                           "Permanent error without enough "
902                                           "hints to proceed: %s\n",
903                                           i, commit_str,
904                                           rd_kafka_error_string(error));
905                         }
906                 }
907         }
908 
909         /* All done */
910         if (rk)
911                 rd_kafka_destroy(rk);
912 
913         SUB_TEST_PASS();
914 }
915 
916 
917 /**
918  * @brief Test that the commit/abort works properly with infinite timeout.
919  */
do_test_txn_endtxn_infinite(void)920 static void do_test_txn_endtxn_infinite (void) {
921         rd_kafka_t *rk;
922         rd_kafka_mock_cluster_t *mcluster = NULL;
923         const char *txnid = "myTxnId";
924         int i;
925 
926         SUB_TEST_QUICK();
927 
928         rk = create_txn_producer(&mcluster, txnid, 3, NULL);
929 
930         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
931 
932         for (i = 0 ; i < 2 ; i++) {
933                 rd_bool_t commit = i == 0;
934                 const char *commit_str = commit ? "commit" : "abort";
935                 rd_kafka_error_t *error;
936                 test_timing_t t_call;
937 
938                 /* Messages will fail on as the transaction fails,
939                  * ignore the DR error */
940                 test_curr->ignore_dr_err = rd_true;
941 
942                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
943 
944                 TEST_CALL_ERR__(rd_kafka_producev(rk,
945                                                   RD_KAFKA_V_TOPIC("mytopic"),
946                                                   RD_KAFKA_V_VALUE("hi", 2),
947                                                   RD_KAFKA_V_END));
948 
949                 /*
950                  * Commit/abort transaction, first with som retriable failures,
951                  * then success.
952                  */
953                 rd_kafka_mock_push_request_errors(
954                         mcluster,
955                         RD_KAFKAP_EndTxn,
956                         10,
957                         RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
958                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
959                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
960                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
961                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
962                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
963                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
964                         RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
965                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
966                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
967 
968                 rd_sleep(1);
969 
970                 TIMING_START(&t_call, "%s_transaction()", commit_str);
971                 if (commit)
972                         error = rd_kafka_commit_transaction(rk, -1);
973                 else
974                         error = rd_kafka_abort_transaction(rk, -1);
975                 TIMING_STOP(&t_call);
976 
977                 TEST_SAY("%s returned %s\n",
978                          commit_str,
979                          error ? rd_kafka_error_string(error) : "success");
980 
981                 TEST_ASSERT(!error,
982                             "Expected %s to succeed, got %s",
983                             commit_str, rd_kafka_error_string(error));
984 
985         }
986 
987         /* All done */
988 
989         rd_kafka_destroy(rk);
990 
991         SUB_TEST_PASS();
992 }
993 
994 
995 
996 /**
997  * @brief Test that the commit/abort user timeout is honoured.
998  */
do_test_txn_endtxn_timeout(void)999 static void do_test_txn_endtxn_timeout (void) {
1000         rd_kafka_t *rk;
1001         rd_kafka_mock_cluster_t *mcluster = NULL;
1002         const char *txnid = "myTxnId";
1003         int i;
1004 
1005         SUB_TEST_QUICK();
1006 
1007         rk = create_txn_producer(&mcluster, txnid, 3, NULL);
1008 
1009         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1010 
1011         for (i = 0 ; i < 2 ; i++) {
1012                 rd_bool_t commit = i == 0;
1013                 const char *commit_str = commit ? "commit" : "abort";
1014                 rd_kafka_error_t *error;
1015                 test_timing_t t_call;
1016 
1017                 /* Messages will fail on as the transaction fails,
1018                  * ignore the DR error */
1019                 test_curr->ignore_dr_err = rd_true;
1020 
1021                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1022 
1023                 TEST_CALL_ERR__(rd_kafka_producev(rk,
1024                                                   RD_KAFKA_V_TOPIC("mytopic"),
1025                                                   RD_KAFKA_V_VALUE("hi", 2),
1026                                                   RD_KAFKA_V_END));
1027 
1028                 /*
1029                  * Commit/abort transaction, first with som retriable failures
1030                  * whos retries exceed the user timeout.
1031                  */
1032                 rd_kafka_mock_push_request_errors(
1033                         mcluster,
1034                         RD_KAFKAP_EndTxn,
1035                         10,
1036                         RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
1037                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1038                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1039                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1040                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1041                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1042                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1043                         RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
1044                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1045                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
1046 
1047                 rd_sleep(1);
1048 
1049                 TIMING_START(&t_call, "%s_transaction()", commit_str);
1050                 if (commit)
1051                         error = rd_kafka_commit_transaction(rk, 100);
1052                 else
1053                         error = rd_kafka_abort_transaction(rk, 100);
1054                 TIMING_STOP(&t_call);
1055 
1056                 TEST_SAY("%s returned %s\n",
1057                          commit_str,
1058                          error ? rd_kafka_error_string(error) : "success");
1059 
1060                 TEST_ASSERT(error != NULL,
1061                             "Expected %s to fail", commit_str);
1062 
1063                 TEST_ASSERT(rd_kafka_error_code(error) ==
1064                             RD_KAFKA_RESP_ERR__TIMED_OUT,
1065                             "Expected %s to fail with timeout, not %s: %s",
1066                             commit_str,
1067                             rd_kafka_error_name(error),
1068                             rd_kafka_error_string(error));
1069 
1070                 if (!commit)
1071                         TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error),
1072                                     "abort_transaction() failure should raise "
1073                                     "a txn_requires_abort error");
1074                 else {
1075                         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
1076                                     "commit_transaction() failure should raise "
1077                                     "a txn_requires_abort error");
1078                         TEST_SAY("Aborting transaction as instructed by "
1079                                  "error flag\n");
1080                         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1081                 }
1082 
1083                 rd_kafka_error_destroy(error);
1084 
1085                 TIMING_ASSERT(&t_call, 99, 199);
1086         }
1087 
1088         /* All done */
1089 
1090         rd_kafka_destroy(rk);
1091 
1092         SUB_TEST_PASS();
1093 }
1094 
1095 
1096 /**
1097  * @brief Test that EndTxn is properly sent for aborted transactions
1098  *        even if AddOffsetsToTxnRequest was retried.
1099  *        This is a check for a txn_req_cnt bug.
1100  */
do_test_txn_req_cnt(void)1101 static void do_test_txn_req_cnt (void) {
1102         rd_kafka_t *rk;
1103         rd_kafka_mock_cluster_t *mcluster;
1104         rd_kafka_topic_partition_list_t *offsets;
1105         rd_kafka_consumer_group_metadata_t *cgmetadata;
1106         const char *txnid = "myTxnId";
1107 
1108         SUB_TEST_QUICK();
1109 
1110         rk = create_txn_producer(&mcluster, txnid, 3, NULL);
1111 
1112         /* Messages will fail on abort(), ignore the DR error */
1113         test_curr->ignore_dr_err = rd_true;
1114 
1115         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1116 
1117         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1118 
1119         /*
1120          * Send some arbitrary offsets, first with some failures, then
1121          * succeed.
1122          */
1123         offsets = rd_kafka_topic_partition_list_new(2);
1124         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1125         rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
1126                 999999111;
1127 
1128         rd_kafka_mock_push_request_errors(
1129                 mcluster,
1130                 RD_KAFKAP_AddOffsetsToTxn,
1131                 2,
1132                 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
1133                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
1134 
1135         rd_kafka_mock_push_request_errors(
1136                 mcluster,
1137                 RD_KAFKAP_TxnOffsetCommit,
1138                 2,
1139                 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
1140                 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
1141 
1142         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1143 
1144         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
1145                                   rk, offsets,
1146                                   cgmetadata, -1));
1147 
1148         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1149         rd_kafka_topic_partition_list_destroy(offsets);
1150 
1151         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
1152 
1153         /* All done */
1154 
1155         rd_kafka_destroy(rk);
1156 
1157         SUB_TEST_PASS();
1158 }
1159 
1160 
1161 /**
1162  * @brief Test abortable errors using mock broker error injections
1163  *        and code coverage checks.
1164  */
do_test_txn_requires_abort_errors(void)1165 static void do_test_txn_requires_abort_errors (void) {
1166         rd_kafka_t *rk;
1167         rd_kafka_mock_cluster_t *mcluster;
1168         rd_kafka_error_t *error;
1169         rd_kafka_resp_err_t err;
1170         rd_kafka_topic_partition_list_t *offsets;
1171         rd_kafka_consumer_group_metadata_t *cgmetadata;
1172         int r;
1173 
1174         SUB_TEST_QUICK();
1175 
1176         rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1177 
1178         test_curr->ignore_dr_err = rd_true;
1179 
1180         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1181 
1182         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1183 
1184         /*
1185          * 1. Fail on produce
1186          */
1187         TEST_SAY("1. Fail on produce\n");
1188 
1189         rd_kafka_mock_push_request_errors(
1190                 mcluster,
1191                 RD_KAFKAP_Produce,
1192                 1,
1193                 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
1194 
1195         err = rd_kafka_producev(rk,
1196                                 RD_KAFKA_V_TOPIC("mytopic"),
1197                                 RD_KAFKA_V_VALUE("hi", 2),
1198                                 RD_KAFKA_V_END);
1199         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1200 
1201         /* Wait for messages to fail */
1202         test_flush(rk, 5000);
1203 
1204         /* Any other transactional API should now raise an error */
1205         offsets = rd_kafka_topic_partition_list_new(1);
1206         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1207 
1208         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1209 
1210         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1211                                                      cgmetadata, -1);
1212 
1213         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1214         rd_kafka_topic_partition_list_destroy(offsets);
1215         TEST_ASSERT(error, "expected error");
1216         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
1217                     "expected abortable error, not %s",
1218                     rd_kafka_error_string(error));
1219         TEST_SAY("Error %s: %s\n",
1220                  rd_kafka_error_name(error),
1221                  rd_kafka_error_string(error));
1222         rd_kafka_error_destroy(error);
1223 
1224         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1225 
1226         /*
1227          * 2. Restart transaction and fail on AddPartitionsToTxn
1228          */
1229         TEST_SAY("2. Fail on AddPartitionsToTxn\n");
1230 
1231         /* First refresh proper Metadata to clear the topic's auth error,
1232          * otherwise the produce() below will fail immediately. */
1233         r = test_get_partition_count(rk, "mytopic", 5000);
1234         TEST_ASSERT(r > 0, "Expected topic %s to exist", "mytopic");
1235 
1236         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1237 
1238         rd_kafka_mock_push_request_errors(
1239                 mcluster,
1240                 RD_KAFKAP_AddPartitionsToTxn,
1241                 1,
1242                 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
1243 
1244         err = rd_kafka_producev(rk,
1245                                 RD_KAFKA_V_TOPIC("mytopic"),
1246                                 RD_KAFKA_V_VALUE("hi", 2),
1247                                 RD_KAFKA_V_END);
1248         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1249 
1250         error = rd_kafka_commit_transaction(rk, 5000);
1251         TEST_ASSERT(error, "commit_transaction should have failed");
1252         TEST_SAY("commit_transaction() error %s: %s\n",
1253                  rd_kafka_error_name(error),
1254                  rd_kafka_error_string(error));
1255         rd_kafka_error_destroy(error);
1256 
1257         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1258 
1259         /*
1260         * 3. Restart transaction and fail on AddOffsetsToTxn
1261         */
1262         TEST_SAY("3. Fail on AddOffsetsToTxn\n");
1263 
1264         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1265 
1266         err = rd_kafka_producev(rk,
1267                                 RD_KAFKA_V_TOPIC("mytopic"),
1268                                 RD_KAFKA_V_VALUE("hi", 2),
1269                                 RD_KAFKA_V_END);
1270         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1271 
1272         rd_kafka_mock_push_request_errors(
1273                 mcluster,
1274                 RD_KAFKAP_AddOffsetsToTxn,
1275                 1,
1276                 RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED);
1277 
1278         offsets = rd_kafka_topic_partition_list_new(1);
1279         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1280         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1281 
1282         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1283                                                      cgmetadata, -1);
1284         TEST_ASSERT(error, "Expected send_offsets..() to fail");
1285         TEST_ASSERT(rd_kafka_error_code(error) ==
1286                     RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
1287                     "expected send_offsets_to_transaction() to fail with "
1288                     "group auth error: not %s",
1289                     rd_kafka_error_name(error));
1290         rd_kafka_error_destroy(error);
1291 
1292         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1293         rd_kafka_topic_partition_list_destroy(offsets);
1294 
1295 
1296         error = rd_kafka_commit_transaction(rk, 5000);
1297         TEST_ASSERT(error, "commit_transaction should have failed");
1298         rd_kafka_error_destroy(error);
1299 
1300         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1301 
1302         /* All done */
1303 
1304         rd_kafka_destroy(rk);
1305 
1306         SUB_TEST_PASS();
1307 }
1308 
1309 
1310 /**
1311  * @brief Test error handling and recover for when broker goes down during
1312  *        an ongoing transaction.
1313  */
do_test_txn_broker_down_in_txn(rd_bool_t down_coord)1314 static void do_test_txn_broker_down_in_txn (rd_bool_t down_coord) {
1315         rd_kafka_t *rk;
1316         rd_kafka_mock_cluster_t *mcluster;
1317         int32_t coord_id, leader_id, down_id;
1318         const char *down_what;
1319         rd_kafka_resp_err_t err;
1320         const char *topic = "test";
1321         const char *transactional_id = "txnid";
1322         int msgcnt = 1000;
1323         int remains = 0;
1324 
1325         /* Assign coordinator and leader to two different brokers */
1326         coord_id = 1;
1327         leader_id = 2;
1328         if (down_coord) {
1329                 down_id = coord_id;
1330                 down_what = "coordinator";
1331         } else {
1332                 down_id = leader_id;
1333                 down_what = "leader";
1334         }
1335 
1336         SUB_TEST_QUICK("Test %s down", down_what);
1337 
1338         rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
1339 
1340         /* Broker down is not a test-failing error */
1341         allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
1342         test_curr->is_fatal_cb = error_is_fatal_cb;
1343 
1344         err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
1345         TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
1346 
1347         rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1348                                       coord_id);
1349         rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
1350 
1351         /* Start transactioning */
1352         TEST_SAY("Starting transaction\n");
1353         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1354 
1355         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1356 
1357         test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1358                                   0, msgcnt / 2, NULL, 0, &remains);
1359 
1360         TEST_SAY("Bringing down %s %"PRId32"\n", down_what, down_id);
1361         rd_kafka_mock_broker_set_down(mcluster, down_id);
1362 
1363         rd_kafka_flush(rk, 3000);
1364 
1365         /* Produce remaining messages */
1366         test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1367                                   msgcnt / 2, msgcnt / 2, NULL, 0, &remains);
1368 
1369         rd_sleep(2);
1370 
1371         TEST_SAY("Bringing up %s %"PRId32"\n", down_what, down_id);
1372         rd_kafka_mock_broker_set_up(mcluster, down_id);
1373 
1374         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
1375 
1376         TEST_ASSERT(remains == 0,
1377                     "%d message(s) were not produced\n", remains);
1378 
1379         rd_kafka_destroy(rk);
1380 
1381         test_curr->is_fatal_cb = NULL;
1382 
1383         SUB_TEST_PASS();
1384 
1385 }
1386 
1387 
1388 
1389 /**
1390  * @brief Advance the coord_id to the next broker.
1391  */
set_next_coord(rd_kafka_mock_cluster_t * mcluster,const char * transactional_id,int broker_cnt,int32_t * coord_idp)1392 static void set_next_coord (rd_kafka_mock_cluster_t *mcluster,
1393                             const char *transactional_id, int broker_cnt,
1394                             int32_t *coord_idp) {
1395         int32_t new_coord_id;
1396 
1397         new_coord_id = 1 + ((*coord_idp) % (broker_cnt));
1398         TEST_SAY("Changing transaction coordinator from %"PRId32
1399                  " to %"PRId32"\n", *coord_idp, new_coord_id);
1400         rd_kafka_mock_coordinator_set(mcluster, "transaction",
1401                                       transactional_id, new_coord_id);
1402 
1403         *coord_idp = new_coord_id;
1404 }
1405 
1406 /**
1407  * @brief Switch coordinator during a transaction.
1408  *
1409  */
do_test_txn_switch_coordinator(void)1410 static void do_test_txn_switch_coordinator (void) {
1411         rd_kafka_t *rk;
1412         rd_kafka_mock_cluster_t *mcluster;
1413         int32_t coord_id;
1414         const char *topic = "test";
1415         const char *transactional_id = "txnid";
1416         const int broker_cnt = 5;
1417         const int iterations = 20;
1418         int i;
1419 
1420         test_timeout_set(iterations * 10);
1421 
1422         SUB_TEST("Test switching coordinators");
1423 
1424         rk = create_txn_producer(&mcluster, transactional_id, broker_cnt, NULL);
1425 
1426         coord_id = 1;
1427         rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1428                                       coord_id);
1429 
1430         /* Start transactioning */
1431         TEST_SAY("Starting transaction\n");
1432         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1433 
1434         for (i = 0 ; i < iterations ; i++) {
1435                 const int msgcnt = 100;
1436                 int remains = 0;
1437 
1438                 set_next_coord(mcluster, transactional_id,
1439                                broker_cnt, &coord_id);
1440 
1441                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1442 
1443                 test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1444                                    0, msgcnt / 2, NULL, 0);
1445 
1446                 if (!(i % 3))
1447                         set_next_coord(mcluster, transactional_id,
1448                                        broker_cnt, &coord_id);
1449 
1450                 /* Produce remaining messages */
1451                 test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1452                                           msgcnt / 2, msgcnt / 2, NULL, 0,
1453                                           &remains);
1454 
1455                 if ((i & 1) || !(i % 8))
1456                         set_next_coord(mcluster, transactional_id,
1457                                        broker_cnt, &coord_id);
1458 
1459 
1460                 if (!(i % 5)) {
1461                         test_curr->ignore_dr_err = rd_false;
1462                         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
1463 
1464                 } else {
1465                         test_curr->ignore_dr_err = rd_true;
1466                         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1467                 }
1468         }
1469 
1470 
1471         rd_kafka_destroy(rk);
1472 
1473         SUB_TEST_PASS();
1474 }
1475 
1476 
1477 /**
1478  * @brief Switch coordinator during a transaction when AddOffsetsToTxn
1479  *        are sent. #3571.
1480  */
do_test_txn_switch_coordinator_refresh(void)1481 static void do_test_txn_switch_coordinator_refresh (void) {
1482         rd_kafka_t *rk;
1483         rd_kafka_mock_cluster_t *mcluster;
1484         const char *topic = "test";
1485         const char *transactional_id = "txnid";
1486         rd_kafka_topic_partition_list_t *offsets;
1487         rd_kafka_consumer_group_metadata_t *cgmetadata;
1488 
1489         SUB_TEST("Test switching coordinators (refresh)");
1490 
1491         rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
1492 
1493         rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1494                                       1);
1495 
1496         /* Start transactioning */
1497         TEST_SAY("Starting transaction\n");
1498         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1499 
1500         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1501 
1502         /* Switch the coordinator so that AddOffsetsToTxnRequest
1503          * will respond with NOT_COORDINATOR. */
1504         TEST_SAY("Switching to coordinator 2\n");
1505         rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1506                                       2);
1507 
1508         /*
1509          * Send some arbitrary offsets.
1510          */
1511         offsets = rd_kafka_topic_partition_list_new(4);
1512         rd_kafka_topic_partition_list_add(offsets, "srctopic",
1513                                           3)->offset = 12;
1514         rd_kafka_topic_partition_list_add(offsets, "srctop2",
1515                                           99)->offset = 99999;
1516 
1517         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1518 
1519         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
1520                                   rk, offsets,
1521                                   cgmetadata, 20*1000));
1522 
1523         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1524         rd_kafka_topic_partition_list_destroy(offsets);
1525 
1526 
1527         /* Produce some messages */
1528         test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10, NULL, 0);
1529 
1530         /* And commit the transaction */
1531         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
1532 
1533         rd_kafka_destroy(rk);
1534 
1535         SUB_TEST_PASS();
1536 }
1537 
1538 
1539 /**
1540  * @brief Test fatal error handling when transactions are not supported
1541  *        by the broker.
1542  */
do_test_txns_not_supported(void)1543 static void do_test_txns_not_supported (void) {
1544         rd_kafka_t *rk;
1545         rd_kafka_conf_t *conf;
1546         rd_kafka_mock_cluster_t *mcluster;
1547         rd_kafka_error_t *error;
1548         rd_kafka_resp_err_t err;
1549 
1550         SUB_TEST_QUICK();
1551 
1552         test_conf_init(&conf, NULL, 10);
1553 
1554         test_conf_set(conf, "transactional.id", "myxnid");
1555         test_conf_set(conf, "bootstrap.servers", ",");
1556         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
1557 
1558         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
1559 
1560         /* Create mock cluster */
1561         mcluster = rd_kafka_mock_cluster_new(rk, 3);
1562 
1563         /* Disable InitProducerId */
1564         rd_kafka_mock_set_apiversion(mcluster, 22/*InitProducerId*/, -1, -1);
1565 
1566 
1567         rd_kafka_brokers_add(rk, rd_kafka_mock_cluster_bootstraps(mcluster));
1568 
1569 
1570 
1571         error = rd_kafka_init_transactions(rk, 5*1000);
1572         TEST_SAY("init_transactions() returned %s: %s\n",
1573                  error ? rd_kafka_error_name(error) : "success",
1574                  error ? rd_kafka_error_string(error) : "success");
1575 
1576         TEST_ASSERT(error, "Expected init_transactions() to fail");
1577         TEST_ASSERT(rd_kafka_error_code(error) ==
1578                     RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
1579                     "Expected init_transactions() to fail with %s, not %s: %s",
1580                     rd_kafka_err2name(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE),
1581                     rd_kafka_error_name(error),
1582                     rd_kafka_error_string(error));
1583         rd_kafka_error_destroy(error);
1584 
1585         err = rd_kafka_producev(rk,
1586                                 RD_KAFKA_V_TOPIC("test"),
1587                                 RD_KAFKA_V_KEY("test", 4),
1588                                 RD_KAFKA_V_END);
1589         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
1590                     "Expected producev() to fail with %s, not %s",
1591                     rd_kafka_err2name(RD_KAFKA_RESP_ERR__FATAL),
1592                     rd_kafka_err2name(err));
1593 
1594         rd_kafka_mock_cluster_destroy(mcluster);
1595 
1596         rd_kafka_destroy(rk);
1597 
1598         SUB_TEST_PASS();
1599 }
1600 
1601 
1602 /**
1603  * @brief CONCURRENT_TRANSACTION on AddOffsets.. should be retried.
1604  */
do_test_txns_send_offsets_concurrent_is_retried(void)1605 static void do_test_txns_send_offsets_concurrent_is_retried (void) {
1606         rd_kafka_t *rk;
1607         rd_kafka_mock_cluster_t *mcluster;
1608         rd_kafka_resp_err_t err;
1609         rd_kafka_topic_partition_list_t *offsets;
1610         rd_kafka_consumer_group_metadata_t *cgmetadata;
1611 
1612         SUB_TEST_QUICK();
1613 
1614         rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1615 
1616         test_curr->ignore_dr_err = rd_true;
1617 
1618         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1619 
1620         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1621 
1622         err = rd_kafka_producev(rk,
1623                                 RD_KAFKA_V_TOPIC("mytopic"),
1624                                 RD_KAFKA_V_VALUE("hi", 2),
1625                                 RD_KAFKA_V_END);
1626         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1627 
1628         /* Wait for messages to be delivered */
1629         test_flush(rk, 5000);
1630 
1631 
1632         /*
1633          * Have AddOffsetsToTxn fail but eventually succeed due to
1634          * infinite retries.
1635          */
1636         rd_kafka_mock_push_request_errors(
1637                 mcluster,
1638                 RD_KAFKAP_AddOffsetsToTxn,
1639                 1+5,/* first request + some retries */
1640                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1641                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1642                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1643                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1644                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1645                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1646 
1647         offsets = rd_kafka_topic_partition_list_new(1);
1648         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1649 
1650         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1651 
1652         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(rk, offsets,
1653                                                                cgmetadata, -1));
1654 
1655         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1656         rd_kafka_topic_partition_list_destroy(offsets);
1657 
1658         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
1659 
1660         /* All done */
1661 
1662         rd_kafka_destroy(rk);
1663 
1664         SUB_TEST_PASS();
1665 }
1666 
1667 
1668 /**
1669  * @brief Verify that request timeouts don't cause crash (#2913).
1670  */
do_test_txns_no_timeout_crash(void)1671 static void do_test_txns_no_timeout_crash (void) {
1672         rd_kafka_t *rk;
1673         rd_kafka_mock_cluster_t *mcluster;
1674         rd_kafka_error_t *error;
1675         rd_kafka_resp_err_t err;
1676         rd_kafka_topic_partition_list_t *offsets;
1677         rd_kafka_consumer_group_metadata_t *cgmetadata;
1678 
1679         SUB_TEST_QUICK();
1680 
1681         rk = create_txn_producer(&mcluster, "txnid", 3,
1682                                  "socket.timeout.ms", "1000",
1683                                  "transaction.timeout.ms", "5000",
1684                                  NULL);
1685 
1686         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1687 
1688         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1689 
1690         err = rd_kafka_producev(rk,
1691                                 RD_KAFKA_V_TOPIC("mytopic"),
1692                                 RD_KAFKA_V_VALUE("hi", 2),
1693                                 RD_KAFKA_V_END);
1694         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1695 
1696         test_flush(rk, -1);
1697 
1698         /* Delay all broker connections */
1699         if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 2000)) ||
1700             (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 2000)) ||
1701             (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 2000)))
1702                 TEST_FAIL("Failed to set broker RTT: %s",
1703                           rd_kafka_err2str(err));
1704 
1705         /* send_offsets..() should now time out */
1706         offsets = rd_kafka_topic_partition_list_new(1);
1707         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1708         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1709 
1710         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1711                                                      cgmetadata, -1);
1712         TEST_ASSERT(error, "Expected send_offsets..() to fail");
1713         TEST_SAY("send_offsets..() failed with %serror: %s\n",
1714                  rd_kafka_error_is_retriable(error) ? "retriable " : "",
1715                  rd_kafka_error_string(error));
1716         TEST_ASSERT(rd_kafka_error_code(error) ==
1717                     RD_KAFKA_RESP_ERR__TIMED_OUT,
1718                     "expected send_offsets_to_transaction() to fail with "
1719                     "timeout, not %s",
1720                     rd_kafka_error_name(error));
1721         TEST_ASSERT(rd_kafka_error_is_retriable(error),
1722                     "expected send_offsets_to_transaction() to fail with "
1723                     "a retriable error");
1724         rd_kafka_error_destroy(error);
1725 
1726         /* Reset delay and try again */
1727         if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 0)) ||
1728             (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 0)) ||
1729             (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 0)))
1730                 TEST_FAIL("Failed to reset broker RTT: %s",
1731                           rd_kafka_err2str(err));
1732 
1733         TEST_SAY("Retrying send_offsets..()\n");
1734         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1735                                                      cgmetadata, -1);
1736         TEST_ASSERT(!error, "Expected send_offsets..() to succeed, got: %s",
1737                     rd_kafka_error_string(error));
1738 
1739         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1740         rd_kafka_topic_partition_list_destroy(offsets);
1741 
1742         /* All done */
1743         rd_kafka_destroy(rk);
1744 
1745         SUB_TEST_PASS();
1746 }
1747 
1748 
1749 /**
1750  * @brief Test auth failure handling.
1751  */
do_test_txn_auth_failure(int16_t ApiKey,rd_kafka_resp_err_t ErrorCode)1752 static void do_test_txn_auth_failure (int16_t ApiKey,
1753                                       rd_kafka_resp_err_t ErrorCode) {
1754         rd_kafka_t *rk;
1755         rd_kafka_mock_cluster_t *mcluster;
1756         rd_kafka_error_t *error;
1757 
1758         SUB_TEST_QUICK("ApiKey=%s ErrorCode=%s",
1759                        rd_kafka_ApiKey2str(ApiKey),
1760                        rd_kafka_err2name(ErrorCode));
1761 
1762         rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1763 
1764         rd_kafka_mock_push_request_errors(mcluster,
1765                                           ApiKey,
1766                                           1,
1767                                           ErrorCode);
1768 
1769         error = rd_kafka_init_transactions(rk, 5000);
1770         TEST_ASSERT(error, "Expected init_transactions() to fail");
1771 
1772         TEST_SAY("init_transactions() failed: %s: %s\n",
1773                  rd_kafka_err2name(rd_kafka_error_code(error)),
1774                  rd_kafka_error_string(error));
1775         TEST_ASSERT(rd_kafka_error_code(error) == ErrorCode,
1776                     "Expected error %s, not %s",
1777                     rd_kafka_err2name(ErrorCode),
1778                     rd_kafka_err2name(rd_kafka_error_code(error)));
1779         TEST_ASSERT(rd_kafka_error_is_fatal(error),
1780                     "Expected error to be fatal");
1781         TEST_ASSERT(!rd_kafka_error_is_retriable(error),
1782                     "Expected error to not be retriable");
1783         rd_kafka_error_destroy(error);
1784 
1785         /* All done */
1786 
1787         rd_kafka_destroy(rk);
1788 
1789         SUB_TEST_PASS();
1790 }
1791 
1792 
1793 /**
1794  * @brief Issue #3041: Commit fails due to message flush() taking too long,
1795  *        eventually resulting in an unabortable error and failure to
1796  *        re-init the transactional producer.
1797  */
do_test_txn_flush_timeout(void)1798 static void do_test_txn_flush_timeout (void) {
1799         rd_kafka_t *rk;
1800         rd_kafka_mock_cluster_t *mcluster;
1801         rd_kafka_topic_partition_list_t *offsets;
1802         rd_kafka_consumer_group_metadata_t *cgmetadata;
1803         rd_kafka_error_t *error;
1804         const char *txnid = "myTxnId";
1805         const char *topic = "myTopic";
1806         const int32_t coord_id = 2;
1807         int msgcounter = 0;
1808         rd_bool_t is_retry = rd_false;
1809 
1810         SUB_TEST_QUICK();
1811 
1812         rk = create_txn_producer(&mcluster, txnid, 3,
1813                                  "message.timeout.ms", "10000",
1814                                  "transaction.timeout.ms", "10000",
1815                                  /* Speed up coordinator reconnect */
1816                                  "reconnect.backoff.max.ms", "1000",
1817                                  NULL);
1818 
1819 
1820         /* Broker down is not a test-failing error */
1821         test_curr->is_fatal_cb = error_is_fatal_cb;
1822         allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
1823 
1824         rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
1825 
1826         /* Set coordinator so we can disconnect it later */
1827         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, coord_id);
1828 
1829         /*
1830          * Init transactions
1831          */
1832         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1833 
1834  retry:
1835         if (!is_retry) {
1836                 /* First attempt should fail. */
1837 
1838                 test_curr->ignore_dr_err = rd_true;
1839                 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
1840 
1841                 /* Assign invalid partition leaders for some partitions so
1842                  * that messages will not be delivered. */
1843                 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, -1);
1844                 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, -1);
1845 
1846         } else {
1847                 /* The retry should succeed */
1848                 test_curr->ignore_dr_err = rd_false;
1849                 test_curr->exp_dr_err = is_retry ? RD_KAFKA_RESP_ERR_NO_ERROR :
1850                         RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
1851 
1852                 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
1853                 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
1854 
1855         }
1856 
1857 
1858         /*
1859          * Start a transaction
1860          */
1861         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1862 
1863         /*
1864          * Produce some messages to specific partitions and random.
1865          */
1866         test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 100, NULL, 10,
1867                                   &msgcounter);
1868         test_produce_msgs2_nowait(rk, topic, 1, 0, 0, 100, NULL, 10,
1869                                   &msgcounter);
1870         test_produce_msgs2_nowait(rk, topic, RD_KAFKA_PARTITION_UA,
1871                                   0, 0, 100, NULL, 10, &msgcounter);
1872 
1873 
1874         /*
1875          * Send some arbitrary offsets.
1876          */
1877         offsets = rd_kafka_topic_partition_list_new(4);
1878         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1879         rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
1880                 999999111;
1881         rd_kafka_topic_partition_list_add(offsets, "srctopic", 0)->offset = 999;
1882         rd_kafka_topic_partition_list_add(offsets, "srctop2", 3499)->offset =
1883                 123456789;
1884 
1885         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1886 
1887         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
1888                                   rk, offsets,
1889                                   cgmetadata, -1));
1890 
1891         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1892         rd_kafka_topic_partition_list_destroy(offsets);
1893 
1894         rd_sleep(2);
1895 
1896         if (!is_retry) {
1897                 /* Now disconnect the coordinator. */
1898                 TEST_SAY("Disconnecting transaction coordinator %"PRId32"\n",
1899                          coord_id);
1900                 rd_kafka_mock_broker_set_down(mcluster, coord_id);
1901         }
1902 
1903         /*
1904          * Start committing.
1905          */
1906         error = rd_kafka_commit_transaction(rk, -1);
1907 
1908         if (!is_retry) {
1909                 TEST_ASSERT(error != NULL,
1910                             "Expected commit to fail");
1911                 TEST_SAY("commit_transaction() failed (expectedly): %s\n",
1912                          rd_kafka_error_string(error));
1913                 rd_kafka_error_destroy(error);
1914 
1915         } else {
1916                 TEST_ASSERT(!error,
1917                             "Expected commit to succeed, not: %s",
1918                             rd_kafka_error_string(error));
1919         }
1920 
1921         if (!is_retry) {
1922                 /*
1923                  * Bring the coordinator back up.
1924                  */
1925                 rd_kafka_mock_broker_set_up(mcluster, coord_id);
1926                 rd_sleep(2);
1927 
1928                 /*
1929                  * Abort, and try again, this time without error.
1930                  */
1931                 TEST_SAY("Aborting and retrying\n");
1932                 is_retry = rd_true;
1933 
1934                 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 60000));
1935                 goto retry;
1936         }
1937 
1938         /* All done */
1939 
1940         rd_kafka_destroy(rk);
1941 
1942         SUB_TEST_PASS();
1943 }
1944 
1945 
1946 /**
1947  * @brief ESC-4424: rko is reused in response handler after destroy in coord_req
1948  *        sender due to bad state.
1949  *
1950  * This is somewhat of a race condition so we need to perform a couple of
1951  * iterations before it hits, usually 2 or 3, so we try at least 15 times.
1952  */
do_test_txn_coord_req_destroy(void)1953 static void do_test_txn_coord_req_destroy (void) {
1954         rd_kafka_t *rk;
1955         rd_kafka_mock_cluster_t *mcluster;
1956         int i;
1957         int errcnt = 0;
1958 
1959         SUB_TEST();
1960 
1961         rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1962 
1963         test_curr->ignore_dr_err = rd_true;
1964 
1965         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1966 
1967         for (i = 0 ; i < 15 ; i++) {
1968                 rd_kafka_error_t *error;
1969                 rd_kafka_resp_err_t err;
1970                 rd_kafka_topic_partition_list_t *offsets;
1971                 rd_kafka_consumer_group_metadata_t *cgmetadata;
1972 
1973                 test_timeout_set(10);
1974 
1975                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1976 
1977                 /*
1978                  * Inject errors to trigger retries
1979                  */
1980                 rd_kafka_mock_push_request_errors(
1981                         mcluster,
1982                         RD_KAFKAP_AddPartitionsToTxn,
1983                         2,/* first request + number of internal retries */
1984                         RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1985                         RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1986 
1987                 rd_kafka_mock_push_request_errors(
1988                         mcluster,
1989                         RD_KAFKAP_AddOffsetsToTxn,
1990                         1,/* first request + number of internal retries */
1991                         RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1992 
1993                 err = rd_kafka_producev(rk,
1994                                         RD_KAFKA_V_TOPIC("mytopic"),
1995                                         RD_KAFKA_V_VALUE("hi", 2),
1996                                         RD_KAFKA_V_END);
1997                 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1998 
1999                 rd_kafka_mock_push_request_errors(
2000                         mcluster,
2001                         RD_KAFKAP_Produce,
2002                         4,
2003                         RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
2004                         RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
2005                         RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
2006                         RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
2007                 /* FIXME: When KIP-360 is supported, add this error:
2008                  *        RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER */
2009 
2010                 err = rd_kafka_producev(rk,
2011                                         RD_KAFKA_V_TOPIC("mytopic"),
2012                                         RD_KAFKA_V_VALUE("hi", 2),
2013                                         RD_KAFKA_V_END);
2014                 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
2015 
2016 
2017                 /*
2018                  * Send offsets to transaction
2019                  */
2020 
2021                 offsets = rd_kafka_topic_partition_list_new(1);
2022                 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->
2023                         offset = 12;
2024 
2025                 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
2026 
2027                 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
2028                                                              cgmetadata, -1);
2029 
2030                 TEST_SAY("send_offsets_to_transaction() #%d: %s\n",
2031                          i, rd_kafka_error_string(error));
2032 
2033                 /* As we can't control the exact timing and sequence
2034                  * of requests this sometimes fails and sometimes succeeds,
2035                  * but we run the test enough times to trigger at least
2036                  * one failure. */
2037                 if (error) {
2038                         TEST_SAY("send_offsets_to_transaction() #%d "
2039                                  "failed (expectedly): %s\n",
2040                                  i, rd_kafka_error_string(error));
2041                         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
2042                                     "Expected abortable error for #%d", i);
2043                         rd_kafka_error_destroy(error);
2044                         errcnt++;
2045                 }
2046 
2047                 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
2048                 rd_kafka_topic_partition_list_destroy(offsets);
2049 
2050                 /* Allow time for internal retries */
2051                 rd_sleep(2);
2052 
2053                 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
2054         }
2055 
2056         TEST_ASSERT(errcnt > 0,
2057                     "Expected at least one send_offets_to_transaction() "
2058                     "failure");
2059 
2060         /* All done */
2061 
2062         rd_kafka_destroy(rk);
2063 }
2064 
2065 
2066 static rd_atomic32_t multi_find_req_cnt;
2067 
2068 static rd_kafka_resp_err_t
multi_find_on_response_received_cb(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)2069 multi_find_on_response_received_cb (rd_kafka_t *rk,
2070                                     int sockfd,
2071                                     const char *brokername,
2072                                     int32_t brokerid,
2073                                     int16_t ApiKey,
2074                                     int16_t ApiVersion,
2075                                     int32_t CorrId,
2076                                     size_t  size,
2077                                     int64_t rtt,
2078                                     rd_kafka_resp_err_t err,
2079                                     void *ic_opaque) {
2080         rd_kafka_mock_cluster_t *mcluster = rd_kafka_handle_mock_cluster(rk);
2081         rd_bool_t done = rd_atomic32_get(&multi_find_req_cnt) > 10000;
2082 
2083         if (ApiKey != RD_KAFKAP_AddOffsetsToTxn || done)
2084                 return RD_KAFKA_RESP_ERR_NO_ERROR;
2085 
2086         TEST_SAY("on_response_received_cb: %s: %s: brokerid %"PRId32
2087                  ", ApiKey %hd, CorrId %d, rtt %.2fms, %s: %s\n",
2088                  rd_kafka_name(rk), brokername, brokerid, ApiKey, CorrId,
2089                  rtt != -1 ? (float)rtt / 1000.0 : 0.0,
2090                  done ? "already done" : "not done yet",
2091                  rd_kafka_err2name(err));
2092 
2093 
2094         if (rd_atomic32_add(&multi_find_req_cnt, 1) == 1) {
2095                 /* Trigger a broker down/up event, which in turns
2096                  * triggers the coord_req_fsm(). */
2097                 rd_kafka_mock_broker_set_down(mcluster, 2);
2098                 rd_kafka_mock_broker_set_up(mcluster, 2);
2099                 return RD_KAFKA_RESP_ERR_NO_ERROR;
2100         }
2101 
2102         /* Trigger a broker down/up event, which in turns
2103          * triggers the coord_req_fsm(). */
2104         rd_kafka_mock_broker_set_down(mcluster, 3);
2105         rd_kafka_mock_broker_set_up(mcluster, 3);
2106 
2107         /* Clear the downed broker's latency so that it reconnects
2108          * quickly, otherwise the ApiVersionRequest will be delayed and
2109          * this will in turn delay the -> UP transition that we need to
2110          * trigger the coord_reqs. */
2111         rd_kafka_mock_broker_set_rtt(mcluster, 3, 0);
2112 
2113         /* Only do this down/up once */
2114         rd_atomic32_add(&multi_find_req_cnt, 10000);
2115 
2116         return RD_KAFKA_RESP_ERR_NO_ERROR;
2117 }
2118 
2119 
2120 /**
2121  * @brief ESC-4444: multiple FindCoordinatorRequests are sent referencing
2122  *        the same coord_req_t, but the first one received will destroy
2123  *        the coord_req_t object and make the subsequent FindCoordingResponses
2124  *        reference a freed object.
2125  *
2126  * What we want to achieve is this sequence:
2127  *  1. AddOffsetsToTxnRequest + Response which..
2128  *  2. Triggers TxnOffsetCommitRequest, but the coordinator is not known, so..
2129  *  3. Triggers a FindCoordinatorRequest
2130  *  4. FindCoordinatorResponse from 3 is received ..
2131  *  5. A TxnOffsetCommitRequest is sent from coord_req_fsm().
2132  *  6. Another broker changing state to Up triggers coord reqs again, which..
2133  *  7. Triggers a second TxnOffsetCommitRequest from coord_req_fsm().
2134  *  7. FindCoordinatorResponse from 5 is received, references the destroyed rko
2135  *     and crashes.
2136  */
do_test_txn_coord_req_multi_find(void)2137 static void do_test_txn_coord_req_multi_find (void) {
2138         rd_kafka_t *rk;
2139         rd_kafka_mock_cluster_t *mcluster;
2140         rd_kafka_error_t *error;
2141         rd_kafka_resp_err_t err;
2142         rd_kafka_topic_partition_list_t *offsets;
2143         rd_kafka_consumer_group_metadata_t *cgmetadata;
2144         const char *txnid = "txnid", *groupid = "mygroupid", *topic = "mytopic";
2145         int i;
2146 
2147         SUB_TEST();
2148 
2149         rd_atomic32_init(&multi_find_req_cnt, 0);
2150 
2151         on_response_received_cb = multi_find_on_response_received_cb;
2152         rk = create_txn_producer(&mcluster, txnid, 3,
2153                                  /* Need connections to all brokers so we
2154                                   * can trigger coord_req_fsm events
2155                                   * by toggling connections. */
2156                                  "enable.sparse.connections", "false",
2157                                  /* Set up on_response_received interceptor */
2158                                  "on_response_received", "", NULL);
2159 
2160         /* Let broker 1 be both txn and group coordinator
2161          * so that the group coordinator connection is up when it is time
2162          * send the TxnOffsetCommitRequest. */
2163         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
2164         rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
2165 
2166         /* Set broker 1, 2, and 3 as leaders for a partition each and
2167          * later produce to both partitions so we know there's a connection
2168          * to all brokers. */
2169         rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
2170         rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
2171         rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2);
2172         rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 3);
2173 
2174         /* Broker down is not a test-failing error */
2175         allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
2176         test_curr->is_fatal_cb = error_is_fatal_cb;
2177 
2178         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
2179 
2180         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2181 
2182         for (i = 0 ; i < 3 ; i++) {
2183                 err = rd_kafka_producev(rk,
2184                                         RD_KAFKA_V_TOPIC(topic),
2185                                         RD_KAFKA_V_PARTITION(i),
2186                                         RD_KAFKA_V_VALUE("hi", 2),
2187                                         RD_KAFKA_V_END);
2188                 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
2189         }
2190 
2191         test_flush(rk, 5000);
2192 
2193         /*
2194          * send_offsets_to_transaction() will query for the group coordinator,
2195          * we need to make those requests slow so that multiple requests are
2196          * sent.
2197          */
2198         for (i = 1 ; i <= 3 ; i++)
2199                 rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 4000);
2200 
2201         /*
2202          * Send offsets to transaction
2203          */
2204 
2205         offsets = rd_kafka_topic_partition_list_new(1);
2206         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->
2207                 offset = 12;
2208 
2209         cgmetadata = rd_kafka_consumer_group_metadata_new(groupid);
2210 
2211         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
2212                                                      cgmetadata, -1);
2213 
2214         TEST_SAY("send_offsets_to_transaction() %s\n",
2215                  rd_kafka_error_string(error));
2216         TEST_ASSERT(!error, "send_offsets_to_transaction() failed: %s",
2217                     rd_kafka_error_string(error));
2218 
2219         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
2220         rd_kafka_topic_partition_list_destroy(offsets);
2221 
2222         /* Clear delay */
2223         for (i = 1 ; i <= 3 ; i++)
2224                 rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 0);
2225 
2226         rd_sleep(5);
2227 
2228         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
2229 
2230         /* All done */
2231 
2232         TEST_ASSERT(rd_atomic32_get(&multi_find_req_cnt) > 10000,
2233                     "on_request_sent interceptor did not trigger properly");
2234 
2235         rd_kafka_destroy(rk);
2236 
2237         on_response_received_cb = NULL;
2238 
2239         SUB_TEST_PASS();
2240 }
2241 
2242 
2243 /**
2244  * @brief ESC-4410: adding producer partitions gradually will trigger multiple
2245  *        AddPartitionsToTxn requests. Due to a bug the third partition to be
2246  *        registered would hang in PEND_TXN state.
2247  *
2248  * Trigger this behaviour by having two outstanding AddPartitionsToTxn requests
2249  * at the same time, followed by a need for a third:
2250  *
2251  * 1. Set coordinator broker rtt high (to give us time to produce).
2252  * 2. Produce to partition 0, will trigger first AddPartitionsToTxn.
2253  * 3. Produce to partition 1, will trigger second AddPartitionsToTxn.
2254  * 4. Wait for second AddPartitionsToTxn response.
2255  * 5. Produce to partition 2, should trigger AddPartitionsToTxn, but bug
2256  *    causes it to be stale in pending state.
2257  */
2258 
2259 static rd_atomic32_t multi_addparts_resp_cnt;
2260 static rd_kafka_resp_err_t
multi_addparts_response_received_cb(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)2261 multi_addparts_response_received_cb (rd_kafka_t *rk,
2262                                      int sockfd,
2263                                      const char *brokername,
2264                                      int32_t brokerid,
2265                                      int16_t ApiKey,
2266                                      int16_t ApiVersion,
2267                                      int32_t CorrId,
2268                                      size_t  size,
2269                                      int64_t rtt,
2270                                      rd_kafka_resp_err_t err,
2271                                      void *ic_opaque) {
2272 
2273         if (ApiKey == RD_KAFKAP_AddPartitionsToTxn) {
2274                 TEST_SAY("on_response_received_cb: %s: %s: brokerid %"PRId32
2275                          ", ApiKey %hd, CorrId %d, rtt %.2fms, count %"PRId32
2276                          ": %s\n",
2277                          rd_kafka_name(rk), brokername, brokerid,
2278                          ApiKey, CorrId,
2279                          rtt != -1 ? (float)rtt / 1000.0 : 0.0,
2280                          rd_atomic32_get(&multi_addparts_resp_cnt),
2281                          rd_kafka_err2name(err));
2282 
2283                 rd_atomic32_add(&multi_addparts_resp_cnt, 1);
2284         }
2285 
2286         return RD_KAFKA_RESP_ERR_NO_ERROR;
2287 }
2288 
2289 
do_test_txn_addparts_req_multi(void)2290 static void do_test_txn_addparts_req_multi (void) {
2291         rd_kafka_t *rk;
2292         rd_kafka_mock_cluster_t *mcluster;
2293         const char *txnid = "txnid", *topic = "mytopic";
2294         int32_t txn_coord = 2;
2295 
2296         SUB_TEST();
2297 
2298         rd_atomic32_init(&multi_addparts_resp_cnt, 0);
2299 
2300         on_response_received_cb = multi_addparts_response_received_cb;
2301         rk = create_txn_producer(&mcluster, txnid, 3,
2302                                  "linger.ms", "0",
2303                                  "message.timeout.ms", "9000",
2304                                  /* Set up on_response_received interceptor */
2305                                  "on_response_received", "", NULL);
2306 
2307         /* Let broker 1 be txn coordinator. */
2308         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
2309                                       txn_coord);
2310 
2311         rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
2312 
2313         /* Set partition leaders to non-txn-coord broker so they wont
2314          * be affected by rtt delay */
2315         rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
2316         rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
2317         rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 1);
2318 
2319 
2320 
2321         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
2322 
2323         /*
2324          * Run one transaction first to let the client familiarize with
2325          * the topic, this avoids metadata lookups, etc, when the real
2326          * test is run.
2327          */
2328         TEST_SAY("Running seed transaction\n");
2329         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2330         TEST_CALL_ERR__(rd_kafka_producev(rk,
2331                                           RD_KAFKA_V_TOPIC(topic),
2332                                           RD_KAFKA_V_VALUE("seed", 4),
2333                                           RD_KAFKA_V_END));
2334         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
2335 
2336 
2337         /*
2338          * Now perform test transaction with rtt delays
2339          */
2340         TEST_SAY("Running test transaction\n");
2341 
2342         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2343 
2344         /* Reset counter */
2345         rd_atomic32_set(&multi_addparts_resp_cnt, 0);
2346 
2347         /* Add latency to txn coordinator so we can pace our produce() calls */
2348         rd_kafka_mock_broker_set_rtt(mcluster, txn_coord, 1000);
2349 
2350         /* Produce to partition 0 */
2351         TEST_CALL_ERR__(rd_kafka_producev(rk,
2352                                           RD_KAFKA_V_TOPIC(topic),
2353                                           RD_KAFKA_V_PARTITION(0),
2354                                           RD_KAFKA_V_VALUE("hi", 2),
2355                                           RD_KAFKA_V_END));
2356 
2357         rd_usleep(500*1000, NULL);
2358 
2359         /* Produce to partition 1 */
2360         TEST_CALL_ERR__(rd_kafka_producev(rk,
2361                                           RD_KAFKA_V_TOPIC(topic),
2362                                           RD_KAFKA_V_PARTITION(1),
2363                                           RD_KAFKA_V_VALUE("hi", 2),
2364                                           RD_KAFKA_V_END));
2365 
2366         TEST_SAY("Waiting for two AddPartitionsToTxnResponse\n");
2367         while (rd_atomic32_get(&multi_addparts_resp_cnt) < 2)
2368                 rd_usleep(10*1000, NULL);
2369 
2370         TEST_SAY("%"PRId32" AddPartitionsToTxnResponses seen\n",
2371                  rd_atomic32_get(&multi_addparts_resp_cnt));
2372 
2373         /* Produce to partition 2, this message will hang in
2374          * queue if the bug is not fixed. */
2375         TEST_CALL_ERR__(rd_kafka_producev(rk,
2376                                           RD_KAFKA_V_TOPIC(topic),
2377                                           RD_KAFKA_V_PARTITION(2),
2378                                           RD_KAFKA_V_VALUE("hi", 2),
2379                                           RD_KAFKA_V_END));
2380 
2381         /* Allow some extra time for things to settle before committing
2382          * transaction. */
2383         rd_usleep(1000*1000, NULL);
2384 
2385         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 10*1000));
2386 
2387         /* All done */
2388         rd_kafka_destroy(rk);
2389 
2390         on_response_received_cb = NULL;
2391 
2392         SUB_TEST_PASS();
2393 }
2394 
2395 
2396 
2397 /**
2398  * @brief Test handling of OffsetFetchRequest returning UNSTABLE_OFFSET_COMMIT.
2399  *
2400  * There are two things to test;
2401  *  - OffsetFetch triggered by committed() (and similar code paths)
2402  *  - OffsetFetch triggered by assign()
2403  */
do_test_unstable_offset_commit(void)2404 static void do_test_unstable_offset_commit (void) {
2405         rd_kafka_t *rk, *c;
2406         rd_kafka_conf_t *c_conf;
2407         rd_kafka_mock_cluster_t *mcluster;
2408         rd_kafka_topic_partition_list_t *offsets;
2409         const char *topic = "mytopic";
2410         const int msgcnt = 100;
2411         const int64_t offset_to_commit = msgcnt / 2;
2412         int i;
2413         int remains = 0;
2414 
2415         SUB_TEST_QUICK();
2416 
2417         rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
2418 
2419         test_conf_init(&c_conf, NULL, 0);
2420         test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
2421         test_conf_set(c_conf, "bootstrap.servers",
2422                       rd_kafka_mock_cluster_bootstraps(mcluster));
2423         test_conf_set(c_conf, "enable.partition.eof", "true");
2424         test_conf_set(c_conf, "auto.offset.reset", "error");
2425         c = test_create_consumer("mygroup", NULL, c_conf, NULL);
2426 
2427         rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
2428 
2429         /* Produce some messages to the topic so that the consumer has
2430          * something to read. */
2431         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
2432         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2433         test_produce_msgs2_nowait(rk, topic, 0, 0, 0, msgcnt,
2434                                   NULL, 0, &remains);
2435         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
2436 
2437 
2438         /* Commit offset */
2439         offsets = rd_kafka_topic_partition_list_new(1);
2440         rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
2441                 offset_to_commit;
2442         TEST_CALL_ERR__(rd_kafka_commit(c, offsets, 0/*sync*/));
2443         rd_kafka_topic_partition_list_destroy(offsets);
2444 
2445         /* Retrieve offsets by calling committed().
2446          *
2447          * Have OffsetFetch fail and retry, on the first iteration
2448          * the API timeout is higher than the amount of time the retries will
2449          * take and thus succeed, and on the second iteration the timeout
2450          * will be lower and thus fail. */
2451         for (i = 0 ; i < 2 ; i++) {
2452                 rd_kafka_resp_err_t err;
2453                 rd_kafka_resp_err_t exp_err = i == 0 ?
2454                         RD_KAFKA_RESP_ERR_NO_ERROR :
2455                         RD_KAFKA_RESP_ERR__TIMED_OUT;
2456                 int timeout_ms = exp_err ? 200 : 5*1000;
2457 
2458                 rd_kafka_mock_push_request_errors(
2459                         mcluster,
2460                         RD_KAFKAP_OffsetFetch,
2461                         1+5,/* first request + some retries */
2462                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2463                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2464                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2465                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2466                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2467                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
2468 
2469                 offsets = rd_kafka_topic_partition_list_new(1);
2470                 rd_kafka_topic_partition_list_add(offsets, topic, 0);
2471 
2472                 err = rd_kafka_committed(c, offsets, timeout_ms);
2473 
2474                 TEST_SAY("#%d: committed() returned %s (expected %s)\n",
2475                          i,
2476                          rd_kafka_err2name(err),
2477                          rd_kafka_err2name(exp_err));
2478 
2479                 TEST_ASSERT(err == exp_err,
2480                             "#%d: Expected committed() to return %s, not %s",
2481                             i,
2482                             rd_kafka_err2name(exp_err),
2483                             rd_kafka_err2name(err));
2484                 TEST_ASSERT(offsets->cnt == 1,
2485                             "Expected 1 committed offset, not %d",
2486                             offsets->cnt);
2487                 if (!exp_err)
2488                         TEST_ASSERT(offsets->elems[0].offset == offset_to_commit,
2489                                     "Expected committed offset %"PRId64", "
2490                                     "not %"PRId64,
2491                                     offset_to_commit,
2492                                     offsets->elems[0].offset);
2493                 else
2494                         TEST_ASSERT(offsets->elems[0].offset < 0,
2495                                     "Expected no committed offset, "
2496                                     "not %"PRId64,
2497                                     offsets->elems[0].offset);
2498 
2499                 rd_kafka_topic_partition_list_destroy(offsets);
2500         }
2501 
2502         TEST_SAY("Phase 2: OffsetFetch lookup through assignment\n");
2503         offsets = rd_kafka_topic_partition_list_new(1);
2504         rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
2505                 RD_KAFKA_OFFSET_STORED;
2506 
2507         rd_kafka_mock_push_request_errors(
2508                 mcluster,
2509                 RD_KAFKAP_OffsetFetch,
2510                 1+5,/* first request + some retries */
2511                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2512                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2513                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2514                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2515                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2516                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
2517 
2518         test_consumer_incremental_assign("assign", c, offsets);
2519         rd_kafka_topic_partition_list_destroy(offsets);
2520 
2521         test_consumer_poll_exact("consume", c, 0,
2522                                  1/*eof*/, 0, msgcnt/2,
2523                                  rd_true/*exact counts*/, NULL);
2524 
2525         /* All done */
2526         rd_kafka_destroy(c);
2527         rd_kafka_destroy(rk);
2528 
2529         SUB_TEST_PASS();
2530 }
2531 
2532 
2533 /**
2534  * @brief If a message times out locally before being attempted to send
2535  *        and commit_transaction() is called, the transaction must not succeed.
2536  *        https://github.com/confluentinc/confluent-kafka-dotnet/issues/1568
2537  */
do_test_commit_after_msg_timeout(void)2538 static void do_test_commit_after_msg_timeout (void) {
2539         rd_kafka_t *rk;
2540         rd_kafka_mock_cluster_t *mcluster;
2541         int32_t coord_id, leader_id;
2542         rd_kafka_resp_err_t err;
2543         rd_kafka_error_t *error;
2544         const char *topic = "test";
2545         const char *transactional_id = "txnid";
2546         int remains = 0;
2547 
2548         SUB_TEST_QUICK();
2549 
2550         /* Assign coordinator and leader to two different brokers */
2551         coord_id = 1;
2552         leader_id = 2;
2553 
2554         rk = create_txn_producer(&mcluster, transactional_id, 3,
2555                                  "message.timeout.ms", "5000",
2556                                  "transaction.timeout.ms", "10000",
2557                                  NULL);
2558 
2559         /* Broker down is not a test-failing error */
2560         allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
2561         test_curr->is_fatal_cb = error_is_fatal_cb;
2562         test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
2563 
2564         err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
2565         TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str (err));
2566 
2567         rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
2568                                       coord_id);
2569         rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
2570 
2571         /* Start transactioning */
2572         TEST_SAY("Starting transaction\n");
2573         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
2574 
2575         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2576 
2577         TEST_SAY("Bringing down %"PRId32"\n", leader_id);
2578         rd_kafka_mock_broker_set_down(mcluster, leader_id);
2579         rd_kafka_mock_broker_set_down(mcluster, coord_id);
2580 
2581         test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
2582 
2583         error = rd_kafka_commit_transaction(rk, -1);
2584         TEST_ASSERT(error != NULL, "expected commit_transaciton() to fail");
2585         TEST_SAY("commit_transaction() failed (as expected): %s\n",
2586                  rd_kafka_error_string(error));
2587         TEST_ASSERT(rd_kafka_error_txn_requires_abort (error),
2588                     "Expected txn_requires_abort error");
2589         rd_kafka_error_destroy(error);
2590 
2591         /* Bring the brokers up so the abort can complete */
2592         rd_kafka_mock_broker_set_up(mcluster, coord_id);
2593         rd_kafka_mock_broker_set_up(mcluster, leader_id);
2594 
2595         TEST_SAY("Aborting transaction\n");
2596         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
2597 
2598         TEST_ASSERT(remains == 0,
2599                     "%d message(s) were not flushed\n", remains);
2600 
2601         TEST_SAY("Attempting second transaction, which should succeed\n");
2602         allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
2603         test_curr->is_fatal_cb = error_is_fatal_cb;
2604         test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
2605 
2606         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2607         test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
2608 
2609         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
2610 
2611         TEST_ASSERT(remains == 0,
2612                     "%d message(s) were not produced\n", remains);
2613 
2614         rd_kafka_destroy(rk);
2615 
2616         test_curr->is_fatal_cb = NULL;
2617 
2618         SUB_TEST_PASS();
2619 }
2620 
2621 
2622 /**
2623  * @brief #3575: Verify that OUT_OF_ORDER_SEQ does not trigger an epoch bump
2624  *        during an ongoing transaction.
2625  *        The transaction should instead enter the abortable state.
2626  */
do_test_out_of_order_seq(void)2627 static void do_test_out_of_order_seq (void) {
2628         rd_kafka_t *rk;
2629         rd_kafka_mock_cluster_t *mcluster;
2630         rd_kafka_error_t *error;
2631         int32_t txn_coord = 1, leader = 2;
2632         const char *txnid = "myTxnId";
2633         test_timing_t timing;
2634         rd_kafka_resp_err_t err;
2635 
2636         SUB_TEST_QUICK();
2637 
2638         rk = create_txn_producer(&mcluster, txnid, 3,
2639                                  "batch.num.messages", "1",
2640                                  NULL);
2641 
2642         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
2643                                       txn_coord);
2644 
2645         rd_kafka_mock_partition_set_leader(mcluster, "mytopic", 0, leader);
2646 
2647         test_curr->ignore_dr_err = rd_true;
2648         test_curr->is_fatal_cb = NULL;
2649 
2650         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
2651 
2652         /*
2653          * Start a transaction
2654          */
2655         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2656 
2657 
2658 
2659         /* Produce one seeding message first to get the leader up and running */
2660         TEST_CALL_ERR__(rd_kafka_producev(rk,
2661                                           RD_KAFKA_V_TOPIC("mytopic"),
2662                                           RD_KAFKA_V_PARTITION(0),
2663                                           RD_KAFKA_V_VALUE("hi", 2),
2664                                           RD_KAFKA_V_END));
2665         test_flush(rk, -1);
2666 
2667         /* Let partition leader have a latency of 2 seconds
2668          * so that we can have multiple messages in-flight. */
2669         rd_kafka_mock_broker_set_rtt(mcluster, leader, 2*1000);
2670 
2671         /* Produce a message, let it fail with with different errors,
2672          * ending with OUT_OF_ORDER which previously triggered an
2673          * Epoch bump. */
2674         rd_kafka_mock_push_request_errors(
2675                 mcluster,
2676                 RD_KAFKAP_Produce,
2677                 3,
2678                 RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
2679                 RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
2680                 RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER);
2681 
2682         /* Produce three messages that will be delayed
2683          * and have errors injected.*/
2684         TEST_CALL_ERR__(rd_kafka_producev(rk,
2685                                           RD_KAFKA_V_TOPIC("mytopic"),
2686                                           RD_KAFKA_V_PARTITION(0),
2687                                           RD_KAFKA_V_VALUE("hi", 2),
2688                                           RD_KAFKA_V_END));
2689         TEST_CALL_ERR__(rd_kafka_producev(rk,
2690                                           RD_KAFKA_V_TOPIC("mytopic"),
2691                                           RD_KAFKA_V_PARTITION(0),
2692                                           RD_KAFKA_V_VALUE("hi", 2),
2693                                           RD_KAFKA_V_END));
2694         TEST_CALL_ERR__(rd_kafka_producev(rk,
2695                                           RD_KAFKA_V_TOPIC("mytopic"),
2696                                           RD_KAFKA_V_PARTITION(0),
2697                                           RD_KAFKA_V_VALUE("hi", 2),
2698                                           RD_KAFKA_V_END));
2699 
2700         /* Now sleep a short while so that the messages are processed
2701          * by the broker and errors are returned. */
2702         TEST_SAY("Sleeping..\n");
2703         rd_sleep(5);
2704 
2705         rd_kafka_mock_broker_set_rtt(mcluster, leader, 0);
2706 
2707         /* Produce a fifth message, should fail with ERR__STATE since
2708          * the transaction should have entered the abortable state. */
2709         err = rd_kafka_producev(rk,
2710                                 RD_KAFKA_V_TOPIC("mytopic"),
2711                                 RD_KAFKA_V_PARTITION(0),
2712                                 RD_KAFKA_V_VALUE("hi", 2),
2713                                 RD_KAFKA_V_END);
2714         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__STATE,
2715                     "Expected produce() to fail with ERR__STATE, not %s",
2716                     rd_kafka_err2name(err));
2717         TEST_SAY("produce() failed as expected: %s\n",
2718                  rd_kafka_err2str(err));
2719 
2720         /* Commit the transaction, should fail with abortable error. */
2721         TIMING_START(&timing, "commit_transaction(-1)");
2722         error = rd_kafka_commit_transaction(rk, -1);
2723         TIMING_STOP(&timing);
2724         TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
2725 
2726         TEST_SAY("commit_transaction() failed (expectedly): %s\n",
2727                  rd_kafka_error_string(error));
2728 
2729         TEST_ASSERT(!rd_kafka_error_is_fatal(error),
2730                     "Did not expect fatal error");
2731         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
2732                     "Expected abortable error");
2733         rd_kafka_error_destroy(error);
2734 
2735         /* Abort the transaction */
2736         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
2737 
2738         /* Run a new transaction without errors to verify that the
2739          * producer can recover. */
2740         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2741 
2742         TEST_CALL_ERR__(rd_kafka_producev(rk,
2743                                           RD_KAFKA_V_TOPIC("mytopic"),
2744                                           RD_KAFKA_V_PARTITION(0),
2745                                           RD_KAFKA_V_VALUE("hi", 2),
2746                                           RD_KAFKA_V_END));
2747 
2748         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
2749 
2750         rd_kafka_destroy(rk);
2751 
2752         SUB_TEST_PASS();
2753 }
2754 
2755 
main_0105_transactions_mock(int argc,char ** argv)2756 int main_0105_transactions_mock (int argc, char **argv) {
2757         if (test_needs_auth()) {
2758                 TEST_SKIP("Mock cluster does not support SSL/SASL\n");
2759                 return 0;
2760         }
2761 
2762         do_test_txn_recoverable_errors();
2763 
2764         do_test_txn_fatal_idempo_errors();
2765 
2766         do_test_txn_fenced_reinit();
2767 
2768         do_test_txn_req_cnt();
2769 
2770         do_test_txn_requires_abort_errors();
2771 
2772         do_test_txn_slow_reinit(rd_false);
2773         do_test_txn_slow_reinit(rd_true);
2774 
2775         /* Just do a subset of tests in quick mode */
2776         if (test_quick)
2777                 return 0;
2778 
2779         do_test_txn_endtxn_errors();
2780 
2781         do_test_txn_endtxn_infinite();
2782 
2783         /* Skip tests for non-infinite commit/abort timeouts
2784          * until they're properly handled by the producer. */
2785         if (0)
2786                 do_test_txn_endtxn_timeout();
2787 
2788         /* Bring down the coordinator */
2789         do_test_txn_broker_down_in_txn(rd_true);
2790 
2791         /* Bring down partition leader */
2792         do_test_txn_broker_down_in_txn(rd_false);
2793 
2794         do_test_txns_not_supported();
2795 
2796         do_test_txns_send_offsets_concurrent_is_retried();
2797 
2798         do_test_txn_coord_req_destroy();
2799 
2800         do_test_txn_coord_req_multi_find();
2801 
2802         do_test_txn_addparts_req_multi();
2803 
2804         do_test_txns_no_timeout_crash();
2805 
2806         do_test_txn_auth_failure(
2807                 RD_KAFKAP_InitProducerId,
2808                 RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
2809 
2810         do_test_txn_auth_failure(
2811                 RD_KAFKAP_FindCoordinator,
2812                 RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
2813 
2814         do_test_txn_flush_timeout();
2815 
2816         do_test_unstable_offset_commit();
2817 
2818         do_test_commit_after_msg_timeout();
2819 
2820         do_test_txn_switch_coordinator();
2821 
2822         do_test_txn_switch_coordinator_refresh();
2823 
2824         do_test_out_of_order_seq();
2825 
2826         return 0;
2827 }
2828