1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2019, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 #include "rdkafka.h"
32 
33 #include "../src/rdkafka_proto.h"
34 #include "../src/rdunittest.h"
35 
36 /**
37  * @name Producer transaction tests using the mock cluster
38  *
39  */
40 
41 
42 static int allowed_error;
43 
44 /**
45  * @brief Decide what error_cb's will cause the test to fail.
46  */
error_is_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)47 static int error_is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
48                               const char *reason) {
49         if (err == allowed_error) {
50                 TEST_SAY("Ignoring allowed error: %s: %s\n",
51                          rd_kafka_err2name(err), reason);
52                 return 0;
53         }
54         return 1;
55 }
56 
57 
58 
59 /**
60  * @brief Create a transactional producer and a mock cluster.
61  */
create_txn_producer(rd_kafka_mock_cluster_t ** mclusterp,const char * transactional_id,int broker_cnt)62 static rd_kafka_t *create_txn_producer (rd_kafka_mock_cluster_t **mclusterp,
63                                         const char *transactional_id,
64                                         int broker_cnt) {
65         rd_kafka_conf_t *conf;
66         rd_kafka_t *rk;
67         char numstr[8];
68 
69         rd_snprintf(numstr, sizeof(numstr), "%d", broker_cnt);
70 
71         test_conf_init(&conf, NULL, 0);
72 
73         test_conf_set(conf, "transactional.id", transactional_id);
74         test_conf_set(conf, "test.mock.num.brokers", numstr);
75         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
76 
77         test_curr->ignore_dr_err = rd_false;
78 
79         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
80 
81         if (mclusterp) {
82                 *mclusterp = rd_kafka_handle_mock_cluster(rk);
83                 TEST_ASSERT(*mclusterp, "failed to create mock cluster");
84         }
85 
86         return rk;
87 }
88 
89 
90 /**
91  * @brief Test recoverable errors using mock broker error injections
92  *        and code coverage checks.
93  */
do_test_txn_recoverable_errors(void)94 static void do_test_txn_recoverable_errors (void) {
95         rd_kafka_t *rk;
96         rd_kafka_mock_cluster_t *mcluster;
97         rd_kafka_resp_err_t err;
98         rd_kafka_topic_partition_list_t *offsets;
99         rd_kafka_consumer_group_metadata_t *cgmetadata;
100         const char *groupid = "myGroupId";
101         const char *txnid = "myTxnId";
102 
103         TEST_SAY(_C_MAG "[ %s ]\n", __FUNCTION__);
104 
105         rk = create_txn_producer(&mcluster, txnid, 3);
106 
107         /* Make sure transaction and group coordinators are different.
108          * This verifies that AddOffsetsToTxnRequest isn't sent to the
109          * transaction coordinator but the group coordinator. */
110         rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
111         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 2);
112 
113         /*
114          * Inject som InitProducerId errors that causes retries
115          */
116         rd_kafka_mock_push_request_errors(
117                 mcluster,
118                 RD_KAFKAP_InitProducerId,
119                 3,
120                 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
121                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
122                 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
123 
124         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
125 
126         (void)RD_UT_COVERAGE_CHECK(0); /* idemp_request_pid_failed(retry) */
127         (void)RD_UT_COVERAGE_CHECK(1); /* txn_idemp_state_change(READY) */
128 
129         /*
130          * Start a transaction
131          */
132         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
133 
134         /*
135          * Produce a message, let it first fail on a fatal idempotent error
136          * that is retryable by the transaction manager, then let it fail with
137          * a non-idempo/non-txn retryable error
138          */
139         rd_kafka_mock_push_request_errors(
140                 mcluster,
141                 RD_KAFKAP_Produce,
142                 1,
143                 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
144                 RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS);
145 
146         err = rd_kafka_producev(rk,
147                                 RD_KAFKA_V_TOPIC("mytopic"),
148                                 RD_KAFKA_V_VALUE("hi", 2),
149                                 RD_KAFKA_V_END);
150         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
151 
152         /* Make sure messages are produced */
153         rd_kafka_flush(rk, -1);
154 
155         /*
156          * Send some arbitrary offsets, first with some failures, then
157          * succeed.
158          */
159         offsets = rd_kafka_topic_partition_list_new(4);
160         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
161         rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
162                 999999111;
163         rd_kafka_topic_partition_list_add(offsets, "srctopic", 0)->offset = 999;
164         rd_kafka_topic_partition_list_add(offsets, "srctop2", 3499)->offset =
165                 123456789;
166 
167         rd_kafka_mock_push_request_errors(
168                 mcluster,
169                 RD_KAFKAP_AddPartitionsToTxn,
170                 1,
171                 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
172 
173         rd_kafka_mock_push_request_errors(
174                 mcluster,
175                 RD_KAFKAP_TxnOffsetCommit,
176                 2,
177                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
178                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
179 
180         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
181 
182         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
183                                   rk, offsets,
184                                   cgmetadata, -1));
185 
186         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
187         rd_kafka_topic_partition_list_destroy(offsets);
188 
189         /*
190          * Commit transaction, first with som failures, then succeed.
191          */
192         rd_kafka_mock_push_request_errors(
193                 mcluster,
194                 RD_KAFKAP_EndTxn,
195                 3,
196                 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
197                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
198                 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
199 
200         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
201 
202         /* All done */
203 
204         rd_kafka_destroy(rk);
205 
206         TEST_SAY(_C_GRN "[ %s PASS ]\n", __FUNCTION__);
207 }
208 
209 
210 /**
211  * @brief Test abortable errors using mock broker error injections
212  *        and code coverage checks.
213  */
do_test_txn_requires_abort_errors(void)214 static void do_test_txn_requires_abort_errors (void) {
215         rd_kafka_t *rk;
216         rd_kafka_mock_cluster_t *mcluster;
217         rd_kafka_error_t *error;
218         rd_kafka_resp_err_t err;
219         rd_kafka_topic_partition_list_t *offsets;
220         rd_kafka_consumer_group_metadata_t *cgmetadata;
221 
222         TEST_SAY(_C_MAG "[ %s ]\n", __FUNCTION__);
223 
224         rk = create_txn_producer(&mcluster, "txnid", 3);
225 
226         test_curr->ignore_dr_err = rd_true;
227 
228         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
229 
230         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
231 
232         /*
233          * 1. Fail on produce
234          */
235         TEST_SAY("1. Fail on produce\n");
236 
237         rd_kafka_mock_push_request_errors(
238                 mcluster,
239                 RD_KAFKAP_Produce,
240                 1,
241                 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
242 
243         err = rd_kafka_producev(rk,
244                                 RD_KAFKA_V_TOPIC("mytopic"),
245                                 RD_KAFKA_V_VALUE("hi", 2),
246                                 RD_KAFKA_V_END);
247         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
248 
249         /* Wait for messages to fail */
250         test_flush(rk, 5000);
251 
252         /* Any other transactional API should now raise an error */
253         offsets = rd_kafka_topic_partition_list_new(1);
254         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
255 
256         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
257 
258         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
259                                                      cgmetadata, -1);
260 
261         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
262         rd_kafka_topic_partition_list_destroy(offsets);
263         TEST_ASSERT(error, "expected error");
264         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
265                     "expected abortable error, not %s",
266                     rd_kafka_error_string(error));
267         TEST_SAY("Error %s: %s\n",
268                  rd_kafka_error_name(error),
269                  rd_kafka_error_string(error));
270         rd_kafka_error_destroy(error);
271 
272         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
273 
274         /*
275          * 2. Restart transaction and fail on AddPartitionsToTxn
276          */
277         TEST_SAY("2. Fail on AddPartitionsToTxn\n");
278 
279         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
280 
281         rd_kafka_mock_push_request_errors(
282                 mcluster,
283                 RD_KAFKAP_AddPartitionsToTxn,
284                 1,
285                 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
286 
287         err = rd_kafka_producev(rk,
288                                 RD_KAFKA_V_TOPIC("mytopic"),
289                                 RD_KAFKA_V_VALUE("hi", 2),
290                                 RD_KAFKA_V_END);
291         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
292 
293         error = rd_kafka_commit_transaction(rk, 5000);
294         TEST_ASSERT(error, "commit_transaction should have failed");
295         TEST_SAY("commit_transaction() error %s: %s\n",
296                  rd_kafka_error_name(error),
297                  rd_kafka_error_string(error));
298         rd_kafka_error_destroy(error);
299 
300         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
301 
302         /*
303         * 3. Restart transaction and fail on AddOffsetsToTxn
304         */
305         TEST_SAY("3. Fail on AddOffsetsToTxn\n");
306 
307         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
308 
309         err = rd_kafka_producev(rk,
310                                 RD_KAFKA_V_TOPIC("mytopic"),
311                                 RD_KAFKA_V_VALUE("hi", 2),
312                                 RD_KAFKA_V_END);
313         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
314 
315         rd_kafka_mock_push_request_errors(
316                 mcluster,
317                 RD_KAFKAP_AddOffsetsToTxn,
318                 1,
319                 RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED);
320 
321         offsets = rd_kafka_topic_partition_list_new(1);
322         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
323         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
324 
325         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
326                                                      cgmetadata, -1);
327         TEST_ASSERT(error, "Expected send_offsets..() to fail");
328         TEST_ASSERT(rd_kafka_error_code(error) ==
329                     RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
330                     "expected send_offsets_to_transaction() to fail with "
331                     "group auth error: not %s",
332                     rd_kafka_error_name(error));
333         rd_kafka_error_destroy(error);
334 
335         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
336         rd_kafka_topic_partition_list_destroy(offsets);
337 
338 
339         error = rd_kafka_commit_transaction(rk, 5000);
340         TEST_ASSERT(error, "commit_transaction should have failed");
341         rd_kafka_error_destroy(error);
342 
343         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
344 
345         /* All done */
346 
347         rd_kafka_destroy(rk);
348 
349         TEST_SAY(_C_GRN "[ %s PASS ]\n", __FUNCTION__);
350 }
351 
352 
353 /**
354  * @brief Test error handling and recover for when broker goes down during
355  *        an ongoing transaction.
356  */
do_test_txn_broker_down_in_txn(rd_bool_t down_coord)357 static void do_test_txn_broker_down_in_txn (rd_bool_t down_coord) {
358         rd_kafka_t *rk;
359         rd_kafka_mock_cluster_t *mcluster;
360         int32_t coord_id, leader_id, down_id;
361         const char *down_what;
362         rd_kafka_resp_err_t err;
363         const char *topic = "test";
364         const char *transactional_id = "txnid";
365         int msgcnt = 1000;
366         int remains = 0;
367 
368         /* Assign coordinator and leader to two different brokers */
369         coord_id = 1;
370         leader_id = 2;
371         if (down_coord) {
372                 down_id = coord_id;
373                 down_what = "coordinator";
374         } else {
375                 down_id = leader_id;
376                 down_what = "leader";
377         }
378 
379         TEST_SAY(_C_MAG "[ Test %s down ]\n", down_what);
380 
381         rk = create_txn_producer(&mcluster, transactional_id, 3);
382 
383         /* Broker down is not a test-failing error */
384         allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
385         test_curr->is_fatal_cb = error_is_fatal_cb;
386 
387         err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
388         TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
389 
390         rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
391                                       coord_id);
392         rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
393 
394         /* Start transactioning */
395         TEST_SAY("Starting transaction\n");
396         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
397 
398         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
399 
400         test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
401                                   0, msgcnt / 2, NULL, 0, &remains);
402 
403         TEST_SAY("Bringing down %s %"PRId32"\n", down_what, down_id);
404         rd_kafka_mock_broker_set_down(mcluster, down_id);
405 
406         rd_kafka_flush(rk, 3000);
407 
408         /* Produce remaining messages */
409         test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
410                                   msgcnt / 2, msgcnt / 2, NULL, 0, &remains);
411 
412         rd_sleep(2);
413 
414         TEST_SAY("Bringing up %s %"PRId32"\n", down_what, down_id);
415         rd_kafka_mock_broker_set_up(mcluster, down_id);
416 
417         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
418 
419         TEST_ASSERT(remains == 0,
420                     "%d message(s) were not produced\n", remains);
421 
422         rd_kafka_destroy(rk);
423 
424         test_curr->is_fatal_cb = NULL;
425 
426         TEST_SAY(_C_GRN "[ Test %s down: PASS ]\n", down_what);
427 
428 }
429 
430 
431 
432 /**
433  * @brief Advance the coord_id to the next broker.
434  */
set_next_coord(rd_kafka_mock_cluster_t * mcluster,const char * transactional_id,int broker_cnt,int32_t * coord_idp)435 static void set_next_coord (rd_kafka_mock_cluster_t *mcluster,
436                             const char *transactional_id, int broker_cnt,
437                             int32_t *coord_idp) {
438         int32_t new_coord_id;
439 
440         new_coord_id = 1 + ((*coord_idp) % (broker_cnt));
441         TEST_SAY("Changing transaction coordinator from %"PRId32
442                  " to %"PRId32"\n", *coord_idp, new_coord_id);
443         rd_kafka_mock_coordinator_set(mcluster, "transaction",
444                                       transactional_id, new_coord_id);
445 
446         *coord_idp = new_coord_id;
447 }
448 
449 /**
450  * @brief Switch coordinator during a transaction.
451  *
452  * @remark Currently fails due to insufficient coord switch handling.
453  */
do_test_txn_switch_coordinator(void)454 static void do_test_txn_switch_coordinator (void) {
455         rd_kafka_t *rk;
456         rd_kafka_mock_cluster_t *mcluster;
457         int32_t coord_id;
458         const char *topic = "test";
459         const char *transactional_id = "txnid";
460         const int broker_cnt = 5;
461         const int iterations = 20;
462         int i;
463 
464         test_timeout_set(iterations * 10);
465 
466         TEST_SAY(_C_MAG "[ Test switching coordinators ]\n");
467 
468         rk = create_txn_producer(&mcluster, transactional_id, broker_cnt);
469 
470         coord_id = 1;
471         rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
472                                       coord_id);
473 
474         /* Start transactioning */
475         TEST_SAY("Starting transaction\n");
476         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
477 
478         for (i = 0 ; i < iterations ; i++) {
479                 const int msgcnt = 100;
480                 int remains = 0;
481 
482                 set_next_coord(mcluster, transactional_id,
483                                broker_cnt, &coord_id);
484 
485                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
486 
487                 test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA,
488                                    0, msgcnt / 2, NULL, 0);
489 
490                 if (!(i % 3))
491                         set_next_coord(mcluster, transactional_id,
492                                        broker_cnt, &coord_id);
493 
494                 /* Produce remaining messages */
495                 test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
496                                           msgcnt / 2, msgcnt / 2, NULL, 0,
497                                           &remains);
498 
499                 if ((i & 1) || !(i % 8))
500                         set_next_coord(mcluster, transactional_id,
501                                        broker_cnt, &coord_id);
502 
503 
504                 if (!(i % 5)) {
505                         test_curr->ignore_dr_err = rd_false;
506                         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
507 
508                 } else {
509                         test_curr->ignore_dr_err = rd_true;
510                         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
511                 }
512         }
513 
514 
515         rd_kafka_destroy(rk);
516 
517         TEST_SAY(_C_GRN "[ Test switching coordinators: PASS ]\n");
518 }
519 
520 
521 /**
522  * @brief Test fatal error handling when transactions are not supported
523  *        by the broker.
524  */
do_test_txns_not_supported(void)525 static void do_test_txns_not_supported (void) {
526         rd_kafka_t *rk;
527         rd_kafka_conf_t *conf;
528         rd_kafka_mock_cluster_t *mcluster;
529         rd_kafka_error_t *error;
530         rd_kafka_resp_err_t err;
531 
532         TEST_SAY(_C_MAG "[ %s ]\n", __FUNCTION__);
533 
534         test_conf_init(&conf, NULL, 10);
535 
536         test_conf_set(conf, "transactional.id", "myxnid");
537         test_conf_set(conf, "bootstrap.servers", ",");
538         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
539 
540         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
541 
542         /* Create mock cluster */
543         mcluster = rd_kafka_mock_cluster_new(rk, 3);
544 
545         /* Disable InitProducerId */
546         rd_kafka_mock_set_apiversion(mcluster, 22/*InitProducerId*/, -1, -1);
547 
548 
549         rd_kafka_brokers_add(rk, rd_kafka_mock_cluster_bootstraps(mcluster));
550 
551 
552 
553         error = rd_kafka_init_transactions(rk, 5*1000);
554         TEST_SAY("init_transactions() returned %s: %s\n",
555                  error ? rd_kafka_error_name(error) : "success",
556                  error ? rd_kafka_error_string(error) : "success");
557 
558         TEST_ASSERT(error, "Expected init_transactions() to fail");
559         TEST_ASSERT(rd_kafka_error_code(error) ==
560                     RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
561                     "Expected init_transactions() to fail with %s, not %s: %s",
562                     rd_kafka_err2name(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE),
563                     rd_kafka_error_name(error),
564                     rd_kafka_error_string(error));
565         rd_kafka_error_destroy(error);
566 
567         err = rd_kafka_producev(rk,
568                                 RD_KAFKA_V_TOPIC("test"),
569                                 RD_KAFKA_V_KEY("test", 4),
570                                 RD_KAFKA_V_END);
571         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
572                     "Expected producev() to fail with %s, not %s",
573                     rd_kafka_err2name(RD_KAFKA_RESP_ERR__FATAL),
574                     rd_kafka_err2name(err));
575 
576         rd_kafka_mock_cluster_destroy(mcluster);
577 
578         rd_kafka_destroy(rk);
579 
580         TEST_SAY(_C_GRN "[ %s: PASS ]\n", __FUNCTION__);
581 }
582 
583 
main_0105_transactions_mock(int argc,char ** argv)584 int main_0105_transactions_mock (int argc, char **argv) {
585 
586         do_test_txn_recoverable_errors();
587 
588         do_test_txn_requires_abort_errors();
589 
590         /* Bring down the coordinator */
591         do_test_txn_broker_down_in_txn(rd_true);
592 
593         /* Bring down partition leader */
594         do_test_txn_broker_down_in_txn(rd_false);
595 
596         do_test_txns_not_supported();
597 
598         if (!test_quick) {
599                 /* Switch coordinator */
600                 do_test_txn_switch_coordinator();
601         }
602 
603         return 0;
604 }
605 
606