1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2019, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 #include "rdkafka.h"
32 
33 #include "../src/rdkafka_proto.h"
34 #include "../src/rdstring.h"
35 #include "../src/rdunittest.h"
36 
37 #include <stdarg.h>
38 
39 
40 /**
41  * @name Producer transaction tests using the mock cluster
42  *
43  */
44 
45 
46 static int allowed_error;
47 
48 /**
49  * @brief Decide what error_cb's will cause the test to fail.
50  */
error_is_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)51 static int error_is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
52                               const char *reason) {
53         if (err == allowed_error ||
54             /* If transport errors are allowed then it is likely
55              * that we'll also see ALL_BROKERS_DOWN. */
56             (allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT &&
57              err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) {
58                 TEST_SAY("Ignoring allowed error: %s: %s\n",
59                          rd_kafka_err2name(err), reason);
60                 return 0;
61         }
62         return 1;
63 }
64 
65 
66 static rd_kafka_resp_err_t (*on_response_received_cb) (rd_kafka_t *rk,
67                                                        int sockfd,
68                                                        const char *brokername,
69                                                        int32_t brokerid,
70                                                        int16_t ApiKey,
71                                                        int16_t ApiVersion,
72                                                        int32_t CorrId,
73                                                        size_t  size,
74                                                        int64_t rtt,
75                                                        rd_kafka_resp_err_t err,
76                                                        void *ic_opaque);
77 
78 /**
79  * @brief Simple on_response_received interceptor that simply calls the
80  *        sub-test's on_response_received_cb function, if set.
81  */
82 static rd_kafka_resp_err_t
on_response_received_trampoline(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)83 on_response_received_trampoline (rd_kafka_t *rk,
84                                  int sockfd,
85                                  const char *brokername,
86                                  int32_t brokerid,
87                                  int16_t ApiKey,
88                                  int16_t ApiVersion,
89                                  int32_t CorrId,
90                                  size_t  size,
91                                  int64_t rtt,
92                                  rd_kafka_resp_err_t err,
93                                  void *ic_opaque) {
94         TEST_ASSERT(on_response_received_cb != NULL, "");
95         return on_response_received_cb(rk, sockfd, brokername, brokerid,
96                                        ApiKey, ApiVersion,
97                                        CorrId, size, rtt, err, ic_opaque);
98 }
99 
100 
101 /**
102  * @brief on_new interceptor to add an on_response_received interceptor.
103  */
on_new_producer(rd_kafka_t * rk,const rd_kafka_conf_t * conf,void * ic_opaque,char * errstr,size_t errstr_size)104 static rd_kafka_resp_err_t on_new_producer (rd_kafka_t *rk,
105                                             const rd_kafka_conf_t *conf,
106                                             void *ic_opaque,
107                                             char *errstr, size_t errstr_size) {
108         rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
109 
110         if (on_response_received_cb)
111                 err = rd_kafka_interceptor_add_on_response_received(
112                         rk, "on_response_received",
113                         on_response_received_trampoline, ic_opaque);
114 
115         return err;
116 }
117 
118 
119 /**
120  * @brief Create a transactional producer and a mock cluster.
121  *
122  * The var-arg list is a NULL-terminated list of
123  * (const char *key, const char *value) config properties.
124  *
125  * Special keys:
126  *   "on_response_received", "" - enable the on_response_received_cb
127  *                                interceptor,
128  *                                which must be assigned prior to
129  *                                calling create_tnx_producer().
130  */
create_txn_producer(rd_kafka_mock_cluster_t ** mclusterp,const char * transactional_id,int broker_cnt,...)131 static rd_kafka_t *create_txn_producer (rd_kafka_mock_cluster_t **mclusterp,
132                                         const char *transactional_id,
133                                         int broker_cnt, ...) {
134         rd_kafka_conf_t *conf;
135         rd_kafka_t *rk;
136         char numstr[8];
137         va_list ap;
138         const char *key;
139         rd_bool_t add_interceptors = rd_false;
140 
141         rd_snprintf(numstr, sizeof(numstr), "%d", broker_cnt);
142 
143         test_conf_init(&conf, NULL, 60);
144 
145         test_conf_set(conf, "transactional.id", transactional_id);
146         /* Speed up reconnects */
147         test_conf_set(conf, "reconnect.backoff.max.ms", "2000");
148         test_conf_set(conf, "test.mock.num.brokers", numstr);
149         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
150 
151         test_curr->ignore_dr_err = rd_false;
152 
153         va_start(ap, broker_cnt);
154         while ((key = va_arg(ap, const char *))) {
155                 if (!strcmp(key, "on_response_received")) {
156                         add_interceptors = rd_true;
157                         (void)va_arg(ap, const char *);
158                 } else {
159                         test_conf_set(conf, key, va_arg(ap, const char *));
160                 }
161         }
162         va_end(ap);
163 
164         /* Add an on_.. interceptors */
165         if (add_interceptors)
166                 rd_kafka_conf_interceptor_add_on_new(
167                         conf,
168                         "on_new_producer",
169                         on_new_producer, NULL);
170 
171         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
172 
173         if (mclusterp) {
174                 *mclusterp = rd_kafka_handle_mock_cluster(rk);
175                 TEST_ASSERT(*mclusterp, "failed to create mock cluster");
176         }
177 
178         return rk;
179 }
180 
181 
182 /**
183  * @brief Test recoverable errors using mock broker error injections
184  *        and code coverage checks.
185  */
do_test_txn_recoverable_errors(void)186 static void do_test_txn_recoverable_errors (void) {
187         rd_kafka_t *rk;
188         rd_kafka_mock_cluster_t *mcluster;
189         rd_kafka_topic_partition_list_t *offsets;
190         rd_kafka_consumer_group_metadata_t *cgmetadata;
191         const char *groupid = "myGroupId";
192         const char *txnid = "myTxnId";
193 
194         SUB_TEST_QUICK();
195 
196         rk = create_txn_producer(&mcluster, txnid, 3,
197                                  "batch.num.messages", "1",
198                                  NULL);
199 
200         /* Make sure transaction and group coordinators are different.
201          * This verifies that AddOffsetsToTxnRequest isn't sent to the
202          * transaction coordinator but the group coordinator. */
203         rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
204         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 2);
205 
206         /*
207          * Inject som InitProducerId errors that causes retries
208          */
209         rd_kafka_mock_push_request_errors(
210                 mcluster,
211                 RD_KAFKAP_InitProducerId,
212                 3,
213                 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
214                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
215                 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
216 
217         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
218 
219         (void)RD_UT_COVERAGE_CHECK(0); /* idemp_request_pid_failed(retry) */
220         (void)RD_UT_COVERAGE_CHECK(1); /* txn_idemp_state_change(READY) */
221 
222         /*
223          * Start a transaction
224          */
225         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
226 
227 
228         /* Produce a message without error first */
229         TEST_CALL_ERR__(rd_kafka_producev(rk,
230                                           RD_KAFKA_V_TOPIC("mytopic"),
231                                           RD_KAFKA_V_PARTITION(0),
232                                           RD_KAFKA_V_VALUE("hi", 2),
233                                           RD_KAFKA_V_END));
234 
235         /*
236          * Produce a message, let it fail with a non-idempo/non-txn
237          * retryable error
238          */
239         rd_kafka_mock_push_request_errors(
240                 mcluster,
241                 RD_KAFKAP_Produce,
242                 1,
243                 RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS);
244 
245         TEST_CALL_ERR__(rd_kafka_producev(rk,
246                                           RD_KAFKA_V_TOPIC("mytopic"),
247                                           RD_KAFKA_V_PARTITION(0),
248                                           RD_KAFKA_V_VALUE("hi", 2),
249                                           RD_KAFKA_V_END));
250 
251         /* Make sure messages are produced */
252         rd_kafka_flush(rk, -1);
253 
254         /*
255          * Send some arbitrary offsets, first with some failures, then
256          * succeed.
257          */
258         offsets = rd_kafka_topic_partition_list_new(4);
259         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
260         rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
261                 999999111;
262         rd_kafka_topic_partition_list_add(offsets, "srctopic", 0)->offset = 999;
263         rd_kafka_topic_partition_list_add(offsets, "srctop2", 3499)->offset =
264                 123456789;
265 
266         rd_kafka_mock_push_request_errors(
267                 mcluster,
268                 RD_KAFKAP_AddPartitionsToTxn,
269                 1,
270                 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
271 
272         rd_kafka_mock_push_request_errors(
273                 mcluster,
274                 RD_KAFKAP_TxnOffsetCommit,
275                 2,
276                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
277                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
278 
279         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
280 
281         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
282                                   rk, offsets,
283                                   cgmetadata, -1));
284 
285         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
286         rd_kafka_topic_partition_list_destroy(offsets);
287 
288         /*
289          * Commit transaction, first with som failures, then succeed.
290          */
291         rd_kafka_mock_push_request_errors(
292                 mcluster,
293                 RD_KAFKAP_EndTxn,
294                 3,
295                 RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
296                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
297                 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
298 
299         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
300 
301         /* All done */
302 
303         rd_kafka_destroy(rk);
304 
305         SUB_TEST_PASS();
306 }
307 
308 
309 /**
310  * @brief KIP-360: Test that fatal idempotence errors triggers abortable
311  *        transaction errors and that the producer can recover.
312  */
do_test_txn_fatal_idempo_errors(void)313 static void do_test_txn_fatal_idempo_errors (void) {
314         rd_kafka_t *rk;
315         rd_kafka_mock_cluster_t *mcluster;
316         rd_kafka_error_t *error;
317         const char *txnid = "myTxnId";
318 
319         SUB_TEST_QUICK();
320 
321         rk = create_txn_producer(&mcluster, txnid, 3,
322                                  "batch.num.messages", "1",
323                                  NULL);
324 
325         test_curr->ignore_dr_err = rd_true;
326         test_curr->is_fatal_cb = error_is_fatal_cb;
327         allowed_error = RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID;
328 
329         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
330 
331         /*
332          * Start a transaction
333          */
334         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
335 
336 
337         /* Produce a message without error first */
338         TEST_CALL_ERR__(rd_kafka_producev(rk,
339                                           RD_KAFKA_V_TOPIC("mytopic"),
340                                           RD_KAFKA_V_PARTITION(0),
341                                           RD_KAFKA_V_VALUE("hi", 2),
342                                           RD_KAFKA_V_END));
343 
344         /* Produce a message, let it fail with a fatal idempo error. */
345         rd_kafka_mock_push_request_errors(
346                 mcluster,
347                 RD_KAFKAP_Produce,
348                 1,
349                 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
350 
351         TEST_CALL_ERR__(rd_kafka_producev(rk,
352                                           RD_KAFKA_V_TOPIC("mytopic"),
353                                           RD_KAFKA_V_PARTITION(0),
354                                           RD_KAFKA_V_VALUE("hi", 2),
355                                           RD_KAFKA_V_END));
356 
357         /* Commit the transaction, should fail */
358         error = rd_kafka_commit_transaction(rk, -1);
359         TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
360 
361         TEST_SAY("commit_transaction() failed (expectedly): %s\n",
362                  rd_kafka_error_string(error));
363 
364         TEST_ASSERT(!rd_kafka_error_is_fatal(error),
365                     "Did not expect fatal error");
366         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
367                     "Expected abortable error");
368         rd_kafka_error_destroy(error);
369 
370         /* Abort the transaction */
371         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
372 
373         /* Run a new transaction without errors to verify that the
374          * producer can recover. */
375         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
376 
377         TEST_CALL_ERR__(rd_kafka_producev(rk,
378                                           RD_KAFKA_V_TOPIC("mytopic"),
379                                           RD_KAFKA_V_PARTITION(0),
380                                           RD_KAFKA_V_VALUE("hi", 2),
381                                           RD_KAFKA_V_END));
382 
383         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
384 
385         /* All done */
386 
387         rd_kafka_destroy(rk);
388 
389         allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
390 
391         SUB_TEST_PASS();
392 }
393 
394 
395 /**
396  * @brief KIP-360: Test that fatal idempotence errors triggers abortable
397  *        transaction errors, but let the broker-side bumping of the
398  *        producer PID take longer than the remaining transaction timeout
399  *        which should raise a retriable error from abort_transaction().
400  *
401  * @param with_sleep After the first abort sleep longer than it takes to
402  *                   re-init the pid so that the internal state automatically
403  *                   transitions.
404  */
do_test_txn_slow_reinit(rd_bool_t with_sleep)405 static void do_test_txn_slow_reinit (rd_bool_t with_sleep) {
406         rd_kafka_t *rk;
407         rd_kafka_mock_cluster_t *mcluster;
408         rd_kafka_error_t *error;
409         int32_t txn_coord = 2;
410         const char *txnid = "myTxnId";
411         test_timing_t timing;
412 
413         SUB_TEST("%s sleep", with_sleep ? "with": "without");
414 
415         rk = create_txn_producer(&mcluster, txnid, 3,
416                                  "batch.num.messages", "1",
417                                  NULL);
418 
419         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
420                                       txn_coord);
421 
422         test_curr->ignore_dr_err = rd_true;
423         test_curr->is_fatal_cb = NULL;
424 
425         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
426 
427         /*
428          * Start a transaction
429          */
430         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
431 
432 
433         /* Produce a message without error first */
434         TEST_CALL_ERR__(rd_kafka_producev(rk,
435                                           RD_KAFKA_V_TOPIC("mytopic"),
436                                           RD_KAFKA_V_PARTITION(0),
437                                           RD_KAFKA_V_VALUE("hi", 2),
438                                           RD_KAFKA_V_END));
439 
440         test_flush(rk, -1);
441 
442         /* Set transaction coordinator latency higher than
443          * the abort_transaction() call timeout so that the automatic
444          * re-initpid takes longer than abort_transaction(). */
445         rd_kafka_mock_broker_push_request_error_rtts(
446                 mcluster,
447                 txn_coord,
448                 RD_KAFKAP_InitProducerId,
449                 1,
450                 RD_KAFKA_RESP_ERR_NO_ERROR, 10000/*10s*/);
451 
452         /* Produce a message, let it fail with a fatal idempo error. */
453         rd_kafka_mock_push_request_errors(
454                 mcluster,
455                 RD_KAFKAP_Produce,
456                 1,
457                 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
458 
459         TEST_CALL_ERR__(rd_kafka_producev(rk,
460                                           RD_KAFKA_V_TOPIC("mytopic"),
461                                           RD_KAFKA_V_PARTITION(0),
462                                           RD_KAFKA_V_VALUE("hi", 2),
463                                           RD_KAFKA_V_END));
464 
465 
466         /* Commit the transaction, should fail */
467         TIMING_START(&timing, "commit_transaction(-1)");
468         error = rd_kafka_commit_transaction(rk, -1);
469         TIMING_STOP(&timing);
470         TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
471 
472         TEST_SAY("commit_transaction() failed (expectedly): %s\n",
473                  rd_kafka_error_string(error));
474 
475         TEST_ASSERT(!rd_kafka_error_is_fatal(error),
476                     "Did not expect fatal error");
477         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
478                     "Expected abortable error");
479         rd_kafka_error_destroy(error);
480 
481         /* Abort the transaction, should fail with retriable (timeout) error */
482         TIMING_START(&timing, "abort_transaction(100)");
483         error = rd_kafka_abort_transaction(rk, 100);
484         TIMING_STOP(&timing);
485         TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
486 
487         TEST_SAY("First abort_transaction() failed: %s\n",
488                  rd_kafka_error_string(error));
489         TEST_ASSERT(!rd_kafka_error_is_fatal(error),
490                     "Did not expect fatal error");
491         TEST_ASSERT(rd_kafka_error_is_retriable(error),
492                     "Expected retriable error");
493         rd_kafka_error_destroy(error);
494 
495         if (with_sleep)
496                 rd_sleep(12);
497 
498         /* Retry abort, should now finish. */
499         TEST_SAY("Retrying abort\n");
500         TIMING_START(&timing, "abort_transaction(-1)");
501         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
502         TIMING_STOP(&timing);
503 
504         /* Run a new transaction without errors to verify that the
505          * producer can recover. */
506         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
507 
508         TEST_CALL_ERR__(rd_kafka_producev(rk,
509                                           RD_KAFKA_V_TOPIC("mytopic"),
510                                           RD_KAFKA_V_PARTITION(0),
511                                           RD_KAFKA_V_VALUE("hi", 2),
512                                           RD_KAFKA_V_END));
513 
514         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
515 
516         /* All done */
517 
518         rd_kafka_destroy(rk);
519 
520         allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
521 
522         SUB_TEST_PASS();
523 }
524 
525 
526 
527 /**
528  * @brief KIP-360: Test that fatal idempotence errors triggers abortable
529  *        transaction errors, but let the broker-side bumping of the
530  *        producer PID fail with a fencing error.
531  *        Should raise a fatal error.
532  */
do_test_txn_fenced_reinit(void)533 static void do_test_txn_fenced_reinit (void) {
534         rd_kafka_t *rk;
535         rd_kafka_mock_cluster_t *mcluster;
536         rd_kafka_error_t *error;
537         int32_t txn_coord = 2;
538         const char *txnid = "myTxnId";
539         char errstr[512];
540         rd_kafka_resp_err_t fatal_err;
541 
542         SUB_TEST_QUICK();
543 
544         rk = create_txn_producer(&mcluster, txnid, 3,
545                                  "batch.num.messages", "1",
546                                  NULL);
547 
548         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
549                                       txn_coord);
550 
551         test_curr->ignore_dr_err = rd_true;
552         test_curr->is_fatal_cb = error_is_fatal_cb;
553         allowed_error = RD_KAFKA_RESP_ERR__FENCED;
554 
555         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
556 
557         /*
558          * Start a transaction
559          */
560         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
561 
562 
563         /* Produce a message without error first */
564         TEST_CALL_ERR__(rd_kafka_producev(rk,
565                                           RD_KAFKA_V_TOPIC("mytopic"),
566                                           RD_KAFKA_V_PARTITION(0),
567                                           RD_KAFKA_V_VALUE("hi", 2),
568                                           RD_KAFKA_V_END));
569 
570         test_flush(rk, -1);
571 
572         /* Fail the PID reinit */
573         rd_kafka_mock_broker_push_request_error_rtts(
574                 mcluster,
575                 txn_coord,
576                 RD_KAFKAP_InitProducerId,
577                 1,
578                 RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH, 0);
579 
580         /* Produce a message, let it fail with a fatal idempo error. */
581         rd_kafka_mock_push_request_errors(
582                 mcluster,
583                 RD_KAFKAP_Produce,
584                 1,
585                 RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
586 
587         TEST_CALL_ERR__(rd_kafka_producev(rk,
588                                           RD_KAFKA_V_TOPIC("mytopic"),
589                                           RD_KAFKA_V_PARTITION(0),
590                                           RD_KAFKA_V_VALUE("hi", 2),
591                                           RD_KAFKA_V_END));
592 
593         test_flush(rk, -1);
594 
595         /* Abort the transaction, should fail with a fatal error */
596         error = rd_kafka_abort_transaction(rk, -1);
597         TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
598 
599         TEST_SAY("abort_transaction() failed: %s\n",
600                  rd_kafka_error_string(error));
601         TEST_ASSERT(rd_kafka_error_is_fatal(error),
602                     "Expected a fatal error");
603         rd_kafka_error_destroy(error);
604 
605         fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
606         TEST_ASSERT(fatal_err,
607                     "Expected a fatal error to have been raised");
608         TEST_SAY("Fatal error: %s: %s\n",
609                  rd_kafka_err2name(fatal_err), errstr);
610 
611         /* All done */
612 
613         rd_kafka_destroy(rk);
614 
615         allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
616 
617         SUB_TEST_PASS();
618 }
619 
620 
621 /**
622  * @brief Test EndTxn errors.
623  */
do_test_txn_endtxn_errors(void)624 static void do_test_txn_endtxn_errors (void) {
625         rd_kafka_t *rk = NULL;
626         rd_kafka_mock_cluster_t *mcluster = NULL;
627         rd_kafka_resp_err_t err;
628         struct {
629                 size_t error_cnt;
630                 rd_kafka_resp_err_t errors[4];
631                 rd_kafka_resp_err_t exp_err;
632                 rd_bool_t exp_retriable;
633                 rd_bool_t exp_abortable;
634                 rd_bool_t exp_fatal;
635         } scenario[] = {
636                 /* This list of errors is from the EndTxnResponse handler in
637                  * AK clients/.../TransactionManager.java */
638                 { /* #0 */
639                         2,
640                         { RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
641                           RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE },
642                         /* Should auto-recover */
643                         RD_KAFKA_RESP_ERR_NO_ERROR,
644                 },
645                 { /* #1 */
646                         2,
647                         { RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
648                           RD_KAFKA_RESP_ERR_NOT_COORDINATOR },
649                         /* Should auto-recover */
650                         RD_KAFKA_RESP_ERR_NO_ERROR,
651                 },
652                 { /* #2 */
653                         1,
654                         { RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS },
655                         /* Should auto-recover */
656                         RD_KAFKA_RESP_ERR_NO_ERROR,
657                 },
658                 { /* #3 */
659                         3,
660                         { RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
661                           RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
662                           RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS },
663                         /* Should auto-recover */
664                         RD_KAFKA_RESP_ERR_NO_ERROR,
665                 },
666                 { /* #4 */
667                         1,
668                         { RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID },
669                         RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
670                         rd_false /* !retriable */,
671                         rd_true /* abortable */,
672                         rd_false /* !fatal */
673                 },
674                 { /* #5 */
675                         1,
676                         { RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING },
677                         RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
678                         rd_false /* !retriable */,
679                         rd_true /* abortable */,
680                         rd_false /* !fatal */
681                 },
682                 { /* #6 */
683                         1,
684                         { RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH },
685                         /* This error is normalized */
686                         RD_KAFKA_RESP_ERR__FENCED,
687                         rd_false /* !retriable */,
688                         rd_false /* !abortable */,
689                         rd_true /* fatal */
690                 },
691                 { /* #7 */
692                         1,
693                         { RD_KAFKA_RESP_ERR_PRODUCER_FENCED },
694                         /* This error is normalized */
695                         RD_KAFKA_RESP_ERR__FENCED,
696                         rd_false /* !retriable */,
697                         rd_false /* !abortable */,
698                         rd_true /* fatal */
699                 },
700                 { /* #8 */
701                         1,
702                         { RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED },
703                         RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
704                         rd_false /* !retriable */,
705                         rd_false /* !abortable */,
706                         rd_true /* fatal */
707                 },
708                 { /* #9 */
709                         1,
710                         { RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED },
711                         RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
712                         rd_false /* !retriable */,
713                         rd_true /* abortable */,
714                         rd_false /* !fatal */
715                 },
716                 { /* #10 */
717                         /* Any other error should raise a fatal error */
718                         1,
719                         { RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE },
720                         RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
721                         rd_false /* !retriable */,
722                         rd_true /* abortable */,
723                         rd_false /* !fatal */,
724                 },
725                 { 0 },
726         };
727         int i;
728 
729         SUB_TEST_QUICK();
730 
731         for (i = 0 ; scenario[i].error_cnt > 0 ; i++) {
732                 int j;
733                 /* For each scenario, test:
734                  *   commit_transaction()
735                  *   flush() + commit_transaction()
736                  *   abort_transaction()
737                  *   flush() + abort_transaction()
738                  */
739                 for (j = 0 ; j < (2+2) ; j++) {
740                         rd_bool_t commit = j < 2;
741                         rd_bool_t with_flush = j & 1;
742                         const char *commit_str =
743                                 commit ?
744                                 (with_flush ? "commit&flush" : "commit") :
745                                 (with_flush ? "abort&flush" : "abort");
746                         rd_kafka_topic_partition_list_t *offsets;
747                         rd_kafka_consumer_group_metadata_t *cgmetadata;
748                         rd_kafka_error_t *error;
749                         test_timing_t t_call;
750 
751                         TEST_SAY("Testing scenario #%d %s with %"PRIusz
752                                  " injected erorrs, expecting %s\n",
753                                  i, commit_str,
754                                  scenario[i].error_cnt,
755                                  rd_kafka_err2name(scenario[i].exp_err));
756 
757                         if (!rk) {
758                                 const char *txnid = "myTxnId";
759                                 rk = create_txn_producer(&mcluster, txnid,
760                                                          3, NULL);
761                                 TEST_CALL_ERROR__(rd_kafka_init_transactions(
762                                                           rk, 5000));
763                         }
764 
765                         /*
766                          * Start transaction
767                          */
768                         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
769 
770                         /* Transaction aborts will cause DR errors:
771                          * ignore them. */
772                         test_curr->ignore_dr_err = !commit;
773 
774                         /*
775                          * Produce a message.
776                          */
777                         err = rd_kafka_producev(rk,
778                                                 RD_KAFKA_V_TOPIC("mytopic"),
779                                                 RD_KAFKA_V_VALUE("hi", 2),
780                                                 RD_KAFKA_V_END);
781                         TEST_ASSERT(!err, "produce failed: %s",
782                                     rd_kafka_err2str(err));
783 
784                         if (with_flush)
785                                 test_flush(rk, -1);
786 
787                         /*
788                          * Send some arbitrary offsets.
789                          */
790                         offsets = rd_kafka_topic_partition_list_new(4);
791                         rd_kafka_topic_partition_list_add(offsets, "srctopic",
792                                                           3)->offset = 12;
793                         rd_kafka_topic_partition_list_add(offsets, "srctop2",
794                                                           99)->offset = 99999;
795 
796                         cgmetadata = rd_kafka_consumer_group_metadata_new(
797                                 "mygroupid");
798 
799                         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
800                                                   rk, offsets,
801                                                   cgmetadata, -1));
802 
803                         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
804                         rd_kafka_topic_partition_list_destroy(offsets);
805 
806                         /*
807                          * Commit transaction, first with som failures,
808                          * then succeed.
809                          */
810                         rd_kafka_mock_push_request_errors_array(
811                                 mcluster,
812                                 RD_KAFKAP_EndTxn,
813                                 scenario[i].error_cnt,
814                                 scenario[i].errors);
815 
816                         TIMING_START(&t_call, "%s", commit_str);
817                         if (commit)
818                                 error = rd_kafka_commit_transaction(
819                                         rk, tmout_multip(5000));
820                         else
821                                 error = rd_kafka_abort_transaction(
822                                         rk, tmout_multip(5000));
823                         TIMING_STOP(&t_call);
824 
825                         if (error)
826                                 TEST_SAY("Scenario #%d %s failed: %s: %s "
827                                          "(retriable=%s, req_abort=%s, "
828                                          "fatal=%s)\n",
829                                          i, commit_str,
830                                          rd_kafka_error_name(error),
831                                          rd_kafka_error_string(error),
832                                          RD_STR_ToF(rd_kafka_error_is_retriable(error)),
833                                          RD_STR_ToF(rd_kafka_error_txn_requires_abort(error)),
834                                          RD_STR_ToF(rd_kafka_error_is_fatal(error)));
835                         else
836                                 TEST_SAY("Scenario #%d %s succeeded\n",
837                                          i, commit_str);
838 
839                         if (!scenario[i].exp_err) {
840                                 TEST_ASSERT(!error,
841                                             "Expected #%d %s to succeed, "
842                                             "got %s",
843                                             i, commit_str,
844                                             rd_kafka_error_string(error));
845                                 continue;
846                         }
847 
848 
849                         TEST_ASSERT(error != NULL,
850                                     "Expected #%d %s to fail",
851                                     i, commit_str);
852                         TEST_ASSERT(scenario[i].exp_err ==
853                                     rd_kafka_error_code(error),
854                                     "Scenario #%d: expected %s, not %s",
855                                     i,
856                                     rd_kafka_err2name(scenario[i].exp_err),
857                                     rd_kafka_error_name(error));
858                         TEST_ASSERT(scenario[i].exp_retriable ==
859                                     (rd_bool_t)
860                                     rd_kafka_error_is_retriable(error),
861                                     "Scenario #%d: retriable mismatch",
862                                     i);
863                         TEST_ASSERT(scenario[i].exp_abortable ==
864                                     (rd_bool_t)
865                                     rd_kafka_error_txn_requires_abort(error),
866                                     "Scenario #%d: abortable mismatch",
867                                     i);
868                         TEST_ASSERT(scenario[i].exp_fatal ==
869                                     (rd_bool_t)rd_kafka_error_is_fatal(error),
870                                     "Scenario #%d: fatal mismatch", i);
871 
872                         /* Handle errors according to the error flags */
873                         if (rd_kafka_error_is_fatal(error)) {
874                                 TEST_SAY("Fatal error, destroying producer\n");
875                                 rd_kafka_error_destroy(error);
876                                 rd_kafka_destroy(rk);
877                                 rk = NULL; /* Will be re-created on the next
878                                             * loop iteration. */
879 
880                         } else if (rd_kafka_error_txn_requires_abort(error)) {
881                                 rd_kafka_error_destroy(error);
882                                 TEST_SAY("Abortable error, "
883                                          "aborting transaction\n");
884                                 TEST_CALL_ERROR__(
885                                         rd_kafka_abort_transaction(rk, -1));
886 
887                         } else if (rd_kafka_error_is_retriable(error)) {
888                                 rd_kafka_error_destroy(error);
889                                 TEST_SAY("Retriable error, retrying %s once\n",
890                                          commit_str);
891                                 if (commit)
892                                         TEST_CALL_ERROR__(
893                                                 rd_kafka_commit_transaction(
894                                                         rk, 5000));
895                                 else
896                                         TEST_CALL_ERROR__(
897                                                 rd_kafka_abort_transaction(
898                                                         rk, 5000));
899                         } else {
900                                 TEST_FAIL("Scenario #%d %s: "
901                                           "Permanent error without enough "
902                                           "hints to proceed: %s\n",
903                                           i, commit_str,
904                                           rd_kafka_error_string(error));
905                         }
906                 }
907         }
908 
909         /* All done */
910         if (rk)
911                 rd_kafka_destroy(rk);
912 
913         SUB_TEST_PASS();
914 }
915 
916 
917 /**
918  * @brief Test that the commit/abort works properly with infinite timeout.
919  */
do_test_txn_endtxn_infinite(void)920 static void do_test_txn_endtxn_infinite (void) {
921         rd_kafka_t *rk;
922         rd_kafka_mock_cluster_t *mcluster = NULL;
923         const char *txnid = "myTxnId";
924         int i;
925 
926         SUB_TEST_QUICK();
927 
928         rk = create_txn_producer(&mcluster, txnid, 3, NULL);
929 
930         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
931 
932         for (i = 0 ; i < 2 ; i++) {
933                 rd_bool_t commit = i == 0;
934                 const char *commit_str = commit ? "commit" : "abort";
935                 rd_kafka_error_t *error;
936                 test_timing_t t_call;
937 
938                 /* Messages will fail on as the transaction fails,
939                  * ignore the DR error */
940                 test_curr->ignore_dr_err = rd_true;
941 
942                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
943 
944                 TEST_CALL_ERR__(rd_kafka_producev(rk,
945                                                   RD_KAFKA_V_TOPIC("mytopic"),
946                                                   RD_KAFKA_V_VALUE("hi", 2),
947                                                   RD_KAFKA_V_END));
948 
949                 /*
950                  * Commit/abort transaction, first with som retriable failures,
951                  * then success.
952                  */
953                 rd_kafka_mock_push_request_errors(
954                         mcluster,
955                         RD_KAFKAP_EndTxn,
956                         10,
957                         RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
958                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
959                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
960                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
961                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
962                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
963                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
964                         RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
965                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
966                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
967 
968                 rd_sleep(1);
969 
970                 TIMING_START(&t_call, "%s_transaction()", commit_str);
971                 if (commit)
972                         error = rd_kafka_commit_transaction(rk, -1);
973                 else
974                         error = rd_kafka_abort_transaction(rk, -1);
975                 TIMING_STOP(&t_call);
976 
977                 TEST_SAY("%s returned %s\n",
978                          commit_str,
979                          error ? rd_kafka_error_string(error) : "success");
980 
981                 TEST_ASSERT(!error,
982                             "Expected %s to succeed, got %s",
983                             commit_str, rd_kafka_error_string(error));
984 
985         }
986 
987         /* All done */
988 
989         rd_kafka_destroy(rk);
990 
991         SUB_TEST_PASS();
992 }
993 
994 
995 
996 /**
997  * @brief Test that the commit/abort user timeout is honoured.
998  */
do_test_txn_endtxn_timeout(void)999 static void do_test_txn_endtxn_timeout (void) {
1000         rd_kafka_t *rk;
1001         rd_kafka_mock_cluster_t *mcluster = NULL;
1002         const char *txnid = "myTxnId";
1003         int i;
1004 
1005         SUB_TEST_QUICK();
1006 
1007         rk = create_txn_producer(&mcluster, txnid, 3, NULL);
1008 
1009         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1010 
1011         for (i = 0 ; i < 2 ; i++) {
1012                 rd_bool_t commit = i == 0;
1013                 const char *commit_str = commit ? "commit" : "abort";
1014                 rd_kafka_error_t *error;
1015                 test_timing_t t_call;
1016 
1017                 /* Messages will fail on as the transaction fails,
1018                  * ignore the DR error */
1019                 test_curr->ignore_dr_err = rd_true;
1020 
1021                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1022 
1023                 TEST_CALL_ERR__(rd_kafka_producev(rk,
1024                                                   RD_KAFKA_V_TOPIC("mytopic"),
1025                                                   RD_KAFKA_V_VALUE("hi", 2),
1026                                                   RD_KAFKA_V_END));
1027 
1028                 /*
1029                  * Commit/abort transaction, first with som retriable failures
1030                  * whos retries exceed the user timeout.
1031                  */
1032                 rd_kafka_mock_push_request_errors(
1033                         mcluster,
1034                         RD_KAFKAP_EndTxn,
1035                         10,
1036                         RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
1037                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1038                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1039                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1040                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1041                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1042                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1043                         RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
1044                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
1045                         RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
1046 
1047                 rd_sleep(1);
1048 
1049                 TIMING_START(&t_call, "%s_transaction()", commit_str);
1050                 if (commit)
1051                         error = rd_kafka_commit_transaction(rk, 100);
1052                 else
1053                         error = rd_kafka_abort_transaction(rk, 100);
1054                 TIMING_STOP(&t_call);
1055 
1056                 TEST_SAY("%s returned %s\n",
1057                          commit_str,
1058                          error ? rd_kafka_error_string(error) : "success");
1059 
1060                 TEST_ASSERT(error != NULL,
1061                             "Expected %s to fail", commit_str);
1062 
1063                 TEST_ASSERT(rd_kafka_error_code(error) ==
1064                             RD_KAFKA_RESP_ERR__TIMED_OUT,
1065                             "Expected %s to fail with timeout, not %s: %s",
1066                             commit_str,
1067                             rd_kafka_error_name(error),
1068                             rd_kafka_error_string(error));
1069 
1070                 if (!commit)
1071                         TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error),
1072                                     "abort_transaction() failure should raise "
1073                                     "a txn_requires_abort error");
1074                 else {
1075                         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
1076                                     "commit_transaction() failure should raise "
1077                                     "a txn_requires_abort error");
1078                         TEST_SAY("Aborting transaction as instructed by "
1079                                  "error flag\n");
1080                         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1081                 }
1082 
1083                 rd_kafka_error_destroy(error);
1084 
1085                 TIMING_ASSERT(&t_call, 99, 199);
1086         }
1087 
1088         /* All done */
1089 
1090         rd_kafka_destroy(rk);
1091 
1092         SUB_TEST_PASS();
1093 }
1094 
1095 
1096 /**
1097  * @brief Test that EndTxn is properly sent for aborted transactions
1098  *        even if AddOffsetsToTxnRequest was retried.
1099  *        This is a check for a txn_req_cnt bug.
1100  */
do_test_txn_req_cnt(void)1101 static void do_test_txn_req_cnt (void) {
1102         rd_kafka_t *rk;
1103         rd_kafka_mock_cluster_t *mcluster;
1104         rd_kafka_topic_partition_list_t *offsets;
1105         rd_kafka_consumer_group_metadata_t *cgmetadata;
1106         const char *txnid = "myTxnId";
1107 
1108         SUB_TEST_QUICK();
1109 
1110         rk = create_txn_producer(&mcluster, txnid, 3, NULL);
1111 
1112         /* Messages will fail on abort(), ignore the DR error */
1113         test_curr->ignore_dr_err = rd_true;
1114 
1115         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1116 
1117         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1118 
1119         /*
1120          * Send some arbitrary offsets, first with some failures, then
1121          * succeed.
1122          */
1123         offsets = rd_kafka_topic_partition_list_new(2);
1124         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1125         rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
1126                 999999111;
1127 
1128         rd_kafka_mock_push_request_errors(
1129                 mcluster,
1130                 RD_KAFKAP_AddOffsetsToTxn,
1131                 2,
1132                 RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
1133                 RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
1134 
1135         rd_kafka_mock_push_request_errors(
1136                 mcluster,
1137                 RD_KAFKAP_TxnOffsetCommit,
1138                 2,
1139                 RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
1140                 RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
1141 
1142         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1143 
1144         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
1145                                   rk, offsets,
1146                                   cgmetadata, -1));
1147 
1148         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1149         rd_kafka_topic_partition_list_destroy(offsets);
1150 
1151         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
1152 
1153         /* All done */
1154 
1155         rd_kafka_destroy(rk);
1156 
1157         SUB_TEST_PASS();
1158 }
1159 
1160 
1161 /**
1162  * @brief Test abortable errors using mock broker error injections
1163  *        and code coverage checks.
1164  */
do_test_txn_requires_abort_errors(void)1165 static void do_test_txn_requires_abort_errors (void) {
1166         rd_kafka_t *rk;
1167         rd_kafka_mock_cluster_t *mcluster;
1168         rd_kafka_error_t *error;
1169         rd_kafka_resp_err_t err;
1170         rd_kafka_topic_partition_list_t *offsets;
1171         rd_kafka_consumer_group_metadata_t *cgmetadata;
1172         int r;
1173 
1174         SUB_TEST_QUICK();
1175 
1176         rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1177 
1178         test_curr->ignore_dr_err = rd_true;
1179 
1180         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1181 
1182         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1183 
1184         /*
1185          * 1. Fail on produce
1186          */
1187         TEST_SAY("1. Fail on produce\n");
1188 
1189         rd_kafka_mock_push_request_errors(
1190                 mcluster,
1191                 RD_KAFKAP_Produce,
1192                 1,
1193                 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
1194 
1195         err = rd_kafka_producev(rk,
1196                                 RD_KAFKA_V_TOPIC("mytopic"),
1197                                 RD_KAFKA_V_VALUE("hi", 2),
1198                                 RD_KAFKA_V_END);
1199         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1200 
1201         /* Wait for messages to fail */
1202         test_flush(rk, 5000);
1203 
1204         /* Any other transactional API should now raise an error */
1205         offsets = rd_kafka_topic_partition_list_new(1);
1206         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1207 
1208         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1209 
1210         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1211                                                      cgmetadata, -1);
1212 
1213         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1214         rd_kafka_topic_partition_list_destroy(offsets);
1215         TEST_ASSERT(error, "expected error");
1216         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
1217                     "expected abortable error, not %s",
1218                     rd_kafka_error_string(error));
1219         TEST_SAY("Error %s: %s\n",
1220                  rd_kafka_error_name(error),
1221                  rd_kafka_error_string(error));
1222         rd_kafka_error_destroy(error);
1223 
1224         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1225 
1226         /*
1227          * 2. Restart transaction and fail on AddPartitionsToTxn
1228          */
1229         TEST_SAY("2. Fail on AddPartitionsToTxn\n");
1230 
1231         /* First refresh proper Metadata to clear the topic's auth error,
1232          * otherwise the produce() below will fail immediately. */
1233         r = test_get_partition_count(rk, "mytopic", 5000);
1234         TEST_ASSERT(r > 0, "Expected topic %s to exist", "mytopic");
1235 
1236         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1237 
1238         rd_kafka_mock_push_request_errors(
1239                 mcluster,
1240                 RD_KAFKAP_AddPartitionsToTxn,
1241                 1,
1242                 RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
1243 
1244         err = rd_kafka_producev(rk,
1245                                 RD_KAFKA_V_TOPIC("mytopic"),
1246                                 RD_KAFKA_V_VALUE("hi", 2),
1247                                 RD_KAFKA_V_END);
1248         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1249 
1250         error = rd_kafka_commit_transaction(rk, 5000);
1251         TEST_ASSERT(error, "commit_transaction should have failed");
1252         TEST_SAY("commit_transaction() error %s: %s\n",
1253                  rd_kafka_error_name(error),
1254                  rd_kafka_error_string(error));
1255         rd_kafka_error_destroy(error);
1256 
1257         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1258 
1259         /*
1260         * 3. Restart transaction and fail on AddOffsetsToTxn
1261         */
1262         TEST_SAY("3. Fail on AddOffsetsToTxn\n");
1263 
1264         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1265 
1266         err = rd_kafka_producev(rk,
1267                                 RD_KAFKA_V_TOPIC("mytopic"),
1268                                 RD_KAFKA_V_VALUE("hi", 2),
1269                                 RD_KAFKA_V_END);
1270         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1271 
1272         rd_kafka_mock_push_request_errors(
1273                 mcluster,
1274                 RD_KAFKAP_AddOffsetsToTxn,
1275                 1,
1276                 RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED);
1277 
1278         offsets = rd_kafka_topic_partition_list_new(1);
1279         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1280         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1281 
1282         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1283                                                      cgmetadata, -1);
1284         TEST_ASSERT(error, "Expected send_offsets..() to fail");
1285         TEST_ASSERT(rd_kafka_error_code(error) ==
1286                     RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
1287                     "expected send_offsets_to_transaction() to fail with "
1288                     "group auth error: not %s",
1289                     rd_kafka_error_name(error));
1290         rd_kafka_error_destroy(error);
1291 
1292         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1293         rd_kafka_topic_partition_list_destroy(offsets);
1294 
1295 
1296         error = rd_kafka_commit_transaction(rk, 5000);
1297         TEST_ASSERT(error, "commit_transaction should have failed");
1298         rd_kafka_error_destroy(error);
1299 
1300         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1301 
1302         /* All done */
1303 
1304         rd_kafka_destroy(rk);
1305 
1306         SUB_TEST_PASS();
1307 }
1308 
1309 
1310 /**
1311  * @brief Test error handling and recover for when broker goes down during
1312  *        an ongoing transaction.
1313  */
do_test_txn_broker_down_in_txn(rd_bool_t down_coord)1314 static void do_test_txn_broker_down_in_txn (rd_bool_t down_coord) {
1315         rd_kafka_t *rk;
1316         rd_kafka_mock_cluster_t *mcluster;
1317         int32_t coord_id, leader_id, down_id;
1318         const char *down_what;
1319         rd_kafka_resp_err_t err;
1320         const char *topic = "test";
1321         const char *transactional_id = "txnid";
1322         int msgcnt = 1000;
1323         int remains = 0;
1324 
1325         /* Assign coordinator and leader to two different brokers */
1326         coord_id = 1;
1327         leader_id = 2;
1328         if (down_coord) {
1329                 down_id = coord_id;
1330                 down_what = "coordinator";
1331         } else {
1332                 down_id = leader_id;
1333                 down_what = "leader";
1334         }
1335 
1336         SUB_TEST_QUICK("Test %s down", down_what);
1337 
1338         rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
1339 
1340         /* Broker down is not a test-failing error */
1341         allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
1342         test_curr->is_fatal_cb = error_is_fatal_cb;
1343 
1344         err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
1345         TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
1346 
1347         rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1348                                       coord_id);
1349         rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
1350 
1351         /* Start transactioning */
1352         TEST_SAY("Starting transaction\n");
1353         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1354 
1355         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1356 
1357         test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1358                                   0, msgcnt / 2, NULL, 0, &remains);
1359 
1360         TEST_SAY("Bringing down %s %"PRId32"\n", down_what, down_id);
1361         rd_kafka_mock_broker_set_down(mcluster, down_id);
1362 
1363         rd_kafka_flush(rk, 3000);
1364 
1365         /* Produce remaining messages */
1366         test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1367                                   msgcnt / 2, msgcnt / 2, NULL, 0, &remains);
1368 
1369         rd_sleep(2);
1370 
1371         TEST_SAY("Bringing up %s %"PRId32"\n", down_what, down_id);
1372         rd_kafka_mock_broker_set_up(mcluster, down_id);
1373 
1374         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
1375 
1376         TEST_ASSERT(remains == 0,
1377                     "%d message(s) were not produced\n", remains);
1378 
1379         rd_kafka_destroy(rk);
1380 
1381         test_curr->is_fatal_cb = NULL;
1382 
1383         SUB_TEST_PASS();
1384 
1385 }
1386 
1387 
1388 
1389 /**
1390  * @brief Advance the coord_id to the next broker.
1391  */
set_next_coord(rd_kafka_mock_cluster_t * mcluster,const char * transactional_id,int broker_cnt,int32_t * coord_idp)1392 static void set_next_coord (rd_kafka_mock_cluster_t *mcluster,
1393                             const char *transactional_id, int broker_cnt,
1394                             int32_t *coord_idp) {
1395         int32_t new_coord_id;
1396 
1397         new_coord_id = 1 + ((*coord_idp) % (broker_cnt));
1398         TEST_SAY("Changing transaction coordinator from %"PRId32
1399                  " to %"PRId32"\n", *coord_idp, new_coord_id);
1400         rd_kafka_mock_coordinator_set(mcluster, "transaction",
1401                                       transactional_id, new_coord_id);
1402 
1403         *coord_idp = new_coord_id;
1404 }
1405 
1406 /**
1407  * @brief Switch coordinator during a transaction.
1408  *
1409  * @remark Currently fails due to insufficient coord switch handling.
1410  */
do_test_txn_switch_coordinator(void)1411 static void do_test_txn_switch_coordinator (void) {
1412         rd_kafka_t *rk;
1413         rd_kafka_mock_cluster_t *mcluster;
1414         int32_t coord_id;
1415         const char *topic = "test";
1416         const char *transactional_id = "txnid";
1417         const int broker_cnt = 5;
1418         const int iterations = 20;
1419         int i;
1420 
1421         test_timeout_set(iterations * 10);
1422 
1423         SUB_TEST("Test switching coordinators");
1424 
1425         rk = create_txn_producer(&mcluster, transactional_id, broker_cnt, NULL);
1426 
1427         coord_id = 1;
1428         rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
1429                                       coord_id);
1430 
1431         /* Start transactioning */
1432         TEST_SAY("Starting transaction\n");
1433         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1434 
1435         for (i = 0 ; i < iterations ; i++) {
1436                 const int msgcnt = 100;
1437                 int remains = 0;
1438 
1439                 set_next_coord(mcluster, transactional_id,
1440                                broker_cnt, &coord_id);
1441 
1442                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1443 
1444                 test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1445                                    0, msgcnt / 2, NULL, 0);
1446 
1447                 if (!(i % 3))
1448                         set_next_coord(mcluster, transactional_id,
1449                                        broker_cnt, &coord_id);
1450 
1451                 /* Produce remaining messages */
1452                 test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
1453                                           msgcnt / 2, msgcnt / 2, NULL, 0,
1454                                           &remains);
1455 
1456                 if ((i & 1) || !(i % 8))
1457                         set_next_coord(mcluster, transactional_id,
1458                                        broker_cnt, &coord_id);
1459 
1460 
1461                 if (!(i % 5)) {
1462                         test_curr->ignore_dr_err = rd_false;
1463                         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
1464 
1465                 } else {
1466                         test_curr->ignore_dr_err = rd_true;
1467                         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
1468                 }
1469         }
1470 
1471 
1472         rd_kafka_destroy(rk);
1473 
1474         SUB_TEST_PASS();
1475 }
1476 
1477 
1478 /**
1479  * @brief Test fatal error handling when transactions are not supported
1480  *        by the broker.
1481  */
do_test_txns_not_supported(void)1482 static void do_test_txns_not_supported (void) {
1483         rd_kafka_t *rk;
1484         rd_kafka_conf_t *conf;
1485         rd_kafka_mock_cluster_t *mcluster;
1486         rd_kafka_error_t *error;
1487         rd_kafka_resp_err_t err;
1488 
1489         SUB_TEST_QUICK();
1490 
1491         test_conf_init(&conf, NULL, 10);
1492 
1493         test_conf_set(conf, "transactional.id", "myxnid");
1494         test_conf_set(conf, "bootstrap.servers", ",");
1495         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
1496 
1497         rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
1498 
1499         /* Create mock cluster */
1500         mcluster = rd_kafka_mock_cluster_new(rk, 3);
1501 
1502         /* Disable InitProducerId */
1503         rd_kafka_mock_set_apiversion(mcluster, 22/*InitProducerId*/, -1, -1);
1504 
1505 
1506         rd_kafka_brokers_add(rk, rd_kafka_mock_cluster_bootstraps(mcluster));
1507 
1508 
1509 
1510         error = rd_kafka_init_transactions(rk, 5*1000);
1511         TEST_SAY("init_transactions() returned %s: %s\n",
1512                  error ? rd_kafka_error_name(error) : "success",
1513                  error ? rd_kafka_error_string(error) : "success");
1514 
1515         TEST_ASSERT(error, "Expected init_transactions() to fail");
1516         TEST_ASSERT(rd_kafka_error_code(error) ==
1517                     RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
1518                     "Expected init_transactions() to fail with %s, not %s: %s",
1519                     rd_kafka_err2name(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE),
1520                     rd_kafka_error_name(error),
1521                     rd_kafka_error_string(error));
1522         rd_kafka_error_destroy(error);
1523 
1524         err = rd_kafka_producev(rk,
1525                                 RD_KAFKA_V_TOPIC("test"),
1526                                 RD_KAFKA_V_KEY("test", 4),
1527                                 RD_KAFKA_V_END);
1528         TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
1529                     "Expected producev() to fail with %s, not %s",
1530                     rd_kafka_err2name(RD_KAFKA_RESP_ERR__FATAL),
1531                     rd_kafka_err2name(err));
1532 
1533         rd_kafka_mock_cluster_destroy(mcluster);
1534 
1535         rd_kafka_destroy(rk);
1536 
1537         SUB_TEST_PASS();
1538 }
1539 
1540 
1541 /**
1542  * @brief CONCURRENT_TRANSACTION on AddOffsets.. should be retried.
1543  */
do_test_txns_send_offsets_concurrent_is_retried(void)1544 static void do_test_txns_send_offsets_concurrent_is_retried (void) {
1545         rd_kafka_t *rk;
1546         rd_kafka_mock_cluster_t *mcluster;
1547         rd_kafka_resp_err_t err;
1548         rd_kafka_topic_partition_list_t *offsets;
1549         rd_kafka_consumer_group_metadata_t *cgmetadata;
1550 
1551         SUB_TEST_QUICK();
1552 
1553         rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1554 
1555         test_curr->ignore_dr_err = rd_true;
1556 
1557         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1558 
1559         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1560 
1561         err = rd_kafka_producev(rk,
1562                                 RD_KAFKA_V_TOPIC("mytopic"),
1563                                 RD_KAFKA_V_VALUE("hi", 2),
1564                                 RD_KAFKA_V_END);
1565         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1566 
1567         /* Wait for messages to be delivered */
1568         test_flush(rk, 5000);
1569 
1570 
1571         /*
1572          * Have AddOffsetsToTxn fail but eventually succeed due to
1573          * infinite retries.
1574          */
1575         rd_kafka_mock_push_request_errors(
1576                 mcluster,
1577                 RD_KAFKAP_AddOffsetsToTxn,
1578                 1+5,/* first request + some retries */
1579                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1580                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1581                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1582                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1583                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1584                 RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1585 
1586         offsets = rd_kafka_topic_partition_list_new(1);
1587         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1588 
1589         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1590 
1591         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(rk, offsets,
1592                                                                cgmetadata, -1));
1593 
1594         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1595         rd_kafka_topic_partition_list_destroy(offsets);
1596 
1597         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
1598 
1599         /* All done */
1600 
1601         rd_kafka_destroy(rk);
1602 
1603         SUB_TEST_PASS();
1604 }
1605 
1606 
1607 /**
1608  * @brief Verify that request timeouts don't cause crash (#2913).
1609  */
do_test_txns_no_timeout_crash(void)1610 static void do_test_txns_no_timeout_crash (void) {
1611         rd_kafka_t *rk;
1612         rd_kafka_mock_cluster_t *mcluster;
1613         rd_kafka_error_t *error;
1614         rd_kafka_resp_err_t err;
1615         rd_kafka_topic_partition_list_t *offsets;
1616         rd_kafka_consumer_group_metadata_t *cgmetadata;
1617 
1618         SUB_TEST_QUICK();
1619 
1620         rk = create_txn_producer(&mcluster, "txnid", 3,
1621                                  "socket.timeout.ms", "1000",
1622                                  "transaction.timeout.ms", "5000",
1623                                  NULL);
1624 
1625         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1626 
1627         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1628 
1629         err = rd_kafka_producev(rk,
1630                                 RD_KAFKA_V_TOPIC("mytopic"),
1631                                 RD_KAFKA_V_VALUE("hi", 2),
1632                                 RD_KAFKA_V_END);
1633         TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1634 
1635         test_flush(rk, -1);
1636 
1637         /* Delay all broker connections */
1638         if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 2000)) ||
1639             (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 2000)) ||
1640             (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 2000)))
1641                 TEST_FAIL("Failed to set broker RTT: %s",
1642                           rd_kafka_err2str(err));
1643 
1644         /* send_offsets..() should now time out */
1645         offsets = rd_kafka_topic_partition_list_new(1);
1646         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1647         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1648 
1649         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1650                                                      cgmetadata, -1);
1651         TEST_ASSERT(error, "Expected send_offsets..() to fail");
1652         TEST_SAY("send_offsets..() failed with %serror: %s\n",
1653                  rd_kafka_error_is_retriable(error) ? "retriable " : "",
1654                  rd_kafka_error_string(error));
1655         TEST_ASSERT(rd_kafka_error_code(error) ==
1656                     RD_KAFKA_RESP_ERR__TIMED_OUT,
1657                     "expected send_offsets_to_transaction() to fail with "
1658                     "timeout, not %s",
1659                     rd_kafka_error_name(error));
1660         TEST_ASSERT(rd_kafka_error_is_retriable(error),
1661                     "expected send_offsets_to_transaction() to fail with "
1662                     "a retriable error");
1663         rd_kafka_error_destroy(error);
1664 
1665         /* Reset delay and try again */
1666         if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 0)) ||
1667             (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 0)) ||
1668             (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 0)))
1669                 TEST_FAIL("Failed to reset broker RTT: %s",
1670                           rd_kafka_err2str(err));
1671 
1672         TEST_SAY("Retrying send_offsets..()\n");
1673         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1674                                                      cgmetadata, -1);
1675         TEST_ASSERT(!error, "Expected send_offsets..() to succeed, got: %s",
1676                     rd_kafka_error_string(error));
1677 
1678         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1679         rd_kafka_topic_partition_list_destroy(offsets);
1680 
1681         /* All done */
1682         rd_kafka_destroy(rk);
1683 
1684         SUB_TEST_PASS();
1685 }
1686 
1687 
1688 /**
1689  * @brief Test auth failure handling.
1690  */
do_test_txn_auth_failure(int16_t ApiKey,rd_kafka_resp_err_t ErrorCode)1691 static void do_test_txn_auth_failure (int16_t ApiKey,
1692                                       rd_kafka_resp_err_t ErrorCode) {
1693         rd_kafka_t *rk;
1694         rd_kafka_mock_cluster_t *mcluster;
1695         rd_kafka_error_t *error;
1696 
1697         SUB_TEST_QUICK("ApiKey=%s ErrorCode=%s",
1698                        rd_kafka_ApiKey2str(ApiKey),
1699                        rd_kafka_err2name(ErrorCode));
1700 
1701         rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1702 
1703         rd_kafka_mock_push_request_errors(mcluster,
1704                                           ApiKey,
1705                                           1,
1706                                           ErrorCode);
1707 
1708         error = rd_kafka_init_transactions(rk, 5000);
1709         TEST_ASSERT(error, "Expected init_transactions() to fail");
1710 
1711         TEST_SAY("init_transactions() failed: %s: %s\n",
1712                  rd_kafka_err2name(rd_kafka_error_code(error)),
1713                  rd_kafka_error_string(error));
1714         TEST_ASSERT(rd_kafka_error_code(error) == ErrorCode,
1715                     "Expected error %s, not %s",
1716                     rd_kafka_err2name(ErrorCode),
1717                     rd_kafka_err2name(rd_kafka_error_code(error)));
1718         TEST_ASSERT(rd_kafka_error_is_fatal(error),
1719                     "Expected error to be fatal");
1720         TEST_ASSERT(!rd_kafka_error_is_retriable(error),
1721                     "Expected error to not be retriable");
1722         rd_kafka_error_destroy(error);
1723 
1724         /* All done */
1725 
1726         rd_kafka_destroy(rk);
1727 
1728         SUB_TEST_PASS();
1729 }
1730 
1731 
1732 /**
1733  * @brief Issue #3041: Commit fails due to message flush() taking too long,
1734  *        eventually resulting in an unabortable error and failure to
1735  *        re-init the transactional producer.
1736  */
do_test_txn_flush_timeout(void)1737 static void do_test_txn_flush_timeout (void) {
1738         rd_kafka_t *rk;
1739         rd_kafka_mock_cluster_t *mcluster;
1740         rd_kafka_topic_partition_list_t *offsets;
1741         rd_kafka_consumer_group_metadata_t *cgmetadata;
1742         rd_kafka_error_t *error;
1743         const char *txnid = "myTxnId";
1744         const char *topic = "myTopic";
1745         const int32_t coord_id = 2;
1746         int msgcounter = 0;
1747         rd_bool_t is_retry = rd_false;
1748 
1749         SUB_TEST_QUICK();
1750 
1751         rk = create_txn_producer(&mcluster, txnid, 3,
1752                                  "message.timeout.ms", "10000",
1753                                  "transaction.timeout.ms", "10000",
1754                                  /* Speed up coordinator reconnect */
1755                                  "reconnect.backoff.max.ms", "1000",
1756                                  NULL);
1757 
1758 
1759         /* Broker down is not a test-failing error */
1760         test_curr->is_fatal_cb = error_is_fatal_cb;
1761         allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
1762 
1763         rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
1764 
1765         /* Set coordinator so we can disconnect it later */
1766         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, coord_id);
1767 
1768         /*
1769          * Init transactions
1770          */
1771         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1772 
1773  retry:
1774         if (!is_retry) {
1775                 /* First attempt should fail. */
1776 
1777                 test_curr->ignore_dr_err = rd_true;
1778                 test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
1779 
1780                 /* Assign invalid partition leaders for some partitions so
1781                  * that messages will not be delivered. */
1782                 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, -1);
1783                 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, -1);
1784 
1785         } else {
1786                 /* The retry should succeed */
1787                 test_curr->ignore_dr_err = rd_false;
1788                 test_curr->exp_dr_err = is_retry ? RD_KAFKA_RESP_ERR_NO_ERROR :
1789                         RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
1790 
1791                 rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
1792                 rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
1793 
1794         }
1795 
1796 
1797         /*
1798          * Start a transaction
1799          */
1800         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1801 
1802         /*
1803          * Produce some messages to specific partitions and random.
1804          */
1805         test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 100, NULL, 10,
1806                                   &msgcounter);
1807         test_produce_msgs2_nowait(rk, topic, 1, 0, 0, 100, NULL, 10,
1808                                   &msgcounter);
1809         test_produce_msgs2_nowait(rk, topic, RD_KAFKA_PARTITION_UA,
1810                                   0, 0, 100, NULL, 10, &msgcounter);
1811 
1812 
1813         /*
1814          * Send some arbitrary offsets.
1815          */
1816         offsets = rd_kafka_topic_partition_list_new(4);
1817         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->offset = 12;
1818         rd_kafka_topic_partition_list_add(offsets, "srctop2", 99)->offset =
1819                 999999111;
1820         rd_kafka_topic_partition_list_add(offsets, "srctopic", 0)->offset = 999;
1821         rd_kafka_topic_partition_list_add(offsets, "srctop2", 3499)->offset =
1822                 123456789;
1823 
1824         cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1825 
1826         TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
1827                                   rk, offsets,
1828                                   cgmetadata, -1));
1829 
1830         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1831         rd_kafka_topic_partition_list_destroy(offsets);
1832 
1833         rd_sleep(2);
1834 
1835         if (!is_retry) {
1836                 /* Now disconnect the coordinator. */
1837                 TEST_SAY("Disconnecting transaction coordinator %"PRId32"\n",
1838                          coord_id);
1839                 rd_kafka_mock_broker_set_down(mcluster, coord_id);
1840         }
1841 
1842         /*
1843          * Start committing.
1844          */
1845         error = rd_kafka_commit_transaction(rk, -1);
1846 
1847         if (!is_retry) {
1848                 TEST_ASSERT(error != NULL,
1849                             "Expected commit to fail");
1850                 TEST_SAY("commit_transaction() failed (expectedly): %s\n",
1851                          rd_kafka_error_string(error));
1852                 rd_kafka_error_destroy(error);
1853 
1854         } else {
1855                 TEST_ASSERT(!error,
1856                             "Expected commit to succeed, not: %s",
1857                             rd_kafka_error_string(error));
1858         }
1859 
1860         if (!is_retry) {
1861                 /*
1862                  * Bring the coordinator back up.
1863                  */
1864                 rd_kafka_mock_broker_set_up(mcluster, coord_id);
1865                 rd_sleep(2);
1866 
1867                 /*
1868                  * Abort, and try again, this time without error.
1869                  */
1870                 TEST_SAY("Aborting and retrying\n");
1871                 is_retry = rd_true;
1872 
1873                 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 60000));
1874                 goto retry;
1875         }
1876 
1877         /* All done */
1878 
1879         rd_kafka_destroy(rk);
1880 
1881         SUB_TEST_PASS();
1882 }
1883 
1884 
1885 /**
1886  * @brief ESC-4424: rko is reused in response handler after destroy in coord_req
1887  *        sender due to bad state.
1888  *
1889  * This is somewhat of a race condition so we need to perform a couple of
1890  * iterations before it hits, usually 2 or 3, so we try at least 15 times.
1891  */
do_test_txn_coord_req_destroy(void)1892 static void do_test_txn_coord_req_destroy (void) {
1893         rd_kafka_t *rk;
1894         rd_kafka_mock_cluster_t *mcluster;
1895         int i;
1896         int errcnt = 0;
1897 
1898         SUB_TEST();
1899 
1900         rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
1901 
1902         test_curr->ignore_dr_err = rd_true;
1903 
1904         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
1905 
1906         for (i = 0 ; i < 15 ; i++) {
1907                 rd_kafka_error_t *error;
1908                 rd_kafka_resp_err_t err;
1909                 rd_kafka_topic_partition_list_t *offsets;
1910                 rd_kafka_consumer_group_metadata_t *cgmetadata;
1911 
1912                 test_timeout_set(10);
1913 
1914                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
1915 
1916                 /*
1917                  * Inject errors to trigger retries
1918                  */
1919                 rd_kafka_mock_push_request_errors(
1920                         mcluster,
1921                         RD_KAFKAP_AddPartitionsToTxn,
1922                         2,/* first request + number of internal retries */
1923                         RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
1924                         RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1925 
1926                 rd_kafka_mock_push_request_errors(
1927                         mcluster,
1928                         RD_KAFKAP_AddOffsetsToTxn,
1929                         1,/* first request + number of internal retries */
1930                         RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
1931 
1932                 err = rd_kafka_producev(rk,
1933                                         RD_KAFKA_V_TOPIC("mytopic"),
1934                                         RD_KAFKA_V_VALUE("hi", 2),
1935                                         RD_KAFKA_V_END);
1936                 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1937 
1938                 rd_kafka_mock_push_request_errors(
1939                         mcluster,
1940                         RD_KAFKAP_Produce,
1941                         4,
1942                         RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
1943                         RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
1944                         RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
1945                         RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
1946                 /* FIXME: When KIP-360 is supported, add this error:
1947                  *        RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER */
1948 
1949                 err = rd_kafka_producev(rk,
1950                                         RD_KAFKA_V_TOPIC("mytopic"),
1951                                         RD_KAFKA_V_VALUE("hi", 2),
1952                                         RD_KAFKA_V_END);
1953                 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
1954 
1955 
1956                 /*
1957                  * Send offsets to transaction
1958                  */
1959 
1960                 offsets = rd_kafka_topic_partition_list_new(1);
1961                 rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->
1962                         offset = 12;
1963 
1964                 cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
1965 
1966                 error = rd_kafka_send_offsets_to_transaction(rk, offsets,
1967                                                              cgmetadata, -1);
1968 
1969                 TEST_SAY("send_offsets_to_transaction() #%d: %s\n",
1970                          i, rd_kafka_error_string(error));
1971 
1972                 /* As we can't control the exact timing and sequence
1973                  * of requests this sometimes fails and sometimes succeeds,
1974                  * but we run the test enough times to trigger at least
1975                  * one failure. */
1976                 if (error) {
1977                         TEST_SAY("send_offsets_to_transaction() #%d "
1978                                  "failed (expectedly): %s\n",
1979                                  i, rd_kafka_error_string(error));
1980                         TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
1981                                     "Expected abortable error for #%d", i);
1982                         rd_kafka_error_destroy(error);
1983                         errcnt++;
1984                 }
1985 
1986                 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1987                 rd_kafka_topic_partition_list_destroy(offsets);
1988 
1989                 /* Allow time for internal retries */
1990                 rd_sleep(2);
1991 
1992                 TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
1993         }
1994 
1995         TEST_ASSERT(errcnt > 0,
1996                     "Expected at least one send_offets_to_transaction() "
1997                     "failure");
1998 
1999         /* All done */
2000 
2001         rd_kafka_destroy(rk);
2002 }
2003 
2004 
2005 static rd_atomic32_t multi_find_req_cnt;
2006 
2007 static rd_kafka_resp_err_t
multi_find_on_response_received_cb(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)2008 multi_find_on_response_received_cb (rd_kafka_t *rk,
2009                                     int sockfd,
2010                                     const char *brokername,
2011                                     int32_t brokerid,
2012                                     int16_t ApiKey,
2013                                     int16_t ApiVersion,
2014                                     int32_t CorrId,
2015                                     size_t  size,
2016                                     int64_t rtt,
2017                                     rd_kafka_resp_err_t err,
2018                                     void *ic_opaque) {
2019         rd_kafka_mock_cluster_t *mcluster = rd_kafka_handle_mock_cluster(rk);
2020         rd_bool_t done = rd_atomic32_get(&multi_find_req_cnt) > 10000;
2021 
2022         if (ApiKey != RD_KAFKAP_AddOffsetsToTxn || done)
2023                 return RD_KAFKA_RESP_ERR_NO_ERROR;
2024 
2025         TEST_SAY("on_response_received_cb: %s: %s: brokerid %"PRId32
2026                  ", ApiKey %hd, CorrId %d, rtt %.2fms, %s: %s\n",
2027                  rd_kafka_name(rk), brokername, brokerid, ApiKey, CorrId,
2028                  rtt != -1 ? (float)rtt / 1000.0 : 0.0,
2029                  done ? "already done" : "not done yet",
2030                  rd_kafka_err2name(err));
2031 
2032 
2033         if (rd_atomic32_add(&multi_find_req_cnt, 1) == 1) {
2034                 /* Trigger a broker down/up event, which in turns
2035                  * triggers the coord_req_fsm(). */
2036                 rd_kafka_mock_broker_set_down(mcluster, 2);
2037                 rd_kafka_mock_broker_set_up(mcluster, 2);
2038                 return RD_KAFKA_RESP_ERR_NO_ERROR;
2039         }
2040 
2041         /* Trigger a broker down/up event, which in turns
2042          * triggers the coord_req_fsm(). */
2043         rd_kafka_mock_broker_set_down(mcluster, 3);
2044         rd_kafka_mock_broker_set_up(mcluster, 3);
2045 
2046         /* Clear the downed broker's latency so that it reconnects
2047          * quickly, otherwise the ApiVersionRequest will be delayed and
2048          * this will in turn delay the -> UP transition that we need to
2049          * trigger the coord_reqs. */
2050         rd_kafka_mock_broker_set_rtt(mcluster, 3, 0);
2051 
2052         /* Only do this down/up once */
2053         rd_atomic32_add(&multi_find_req_cnt, 10000);
2054 
2055         return RD_KAFKA_RESP_ERR_NO_ERROR;
2056 }
2057 
2058 
2059 /**
2060  * @brief ESC-4444: multiple FindCoordinatorRequests are sent referencing
2061  *        the same coord_req_t, but the first one received will destroy
2062  *        the coord_req_t object and make the subsequent FindCoordingResponses
2063  *        reference a freed object.
2064  *
2065  * What we want to achieve is this sequence:
2066  *  1. AddOffsetsToTxnRequest + Response which..
2067  *  2. Triggers TxnOffsetCommitRequest, but the coordinator is not known, so..
2068  *  3. Triggers a FindCoordinatorRequest
2069  *  4. FindCoordinatorResponse from 3 is received ..
2070  *  5. A TxnOffsetCommitRequest is sent from coord_req_fsm().
2071  *  6. Another broker changing state to Up triggers coord reqs again, which..
2072  *  7. Triggers a second TxnOffsetCommitRequest from coord_req_fsm().
2073  *  7. FindCoordinatorResponse from 5 is received, references the destroyed rko
2074  *     and crashes.
2075  */
do_test_txn_coord_req_multi_find(void)2076 static void do_test_txn_coord_req_multi_find (void) {
2077         rd_kafka_t *rk;
2078         rd_kafka_mock_cluster_t *mcluster;
2079         rd_kafka_error_t *error;
2080         rd_kafka_resp_err_t err;
2081         rd_kafka_topic_partition_list_t *offsets;
2082         rd_kafka_consumer_group_metadata_t *cgmetadata;
2083         const char *txnid = "txnid", *groupid = "mygroupid", *topic = "mytopic";
2084         int i;
2085 
2086         SUB_TEST();
2087 
2088         rd_atomic32_init(&multi_find_req_cnt, 0);
2089 
2090         on_response_received_cb = multi_find_on_response_received_cb;
2091         rk = create_txn_producer(&mcluster, txnid, 3,
2092                                  /* Need connections to all brokers so we
2093                                   * can trigger coord_req_fsm events
2094                                   * by toggling connections. */
2095                                  "enable.sparse.connections", "false",
2096                                  /* Set up on_response_received interceptor */
2097                                  "on_response_received", "", NULL);
2098 
2099         /* Let broker 1 be both txn and group coordinator
2100          * so that the group coordinator connection is up when it is time
2101          * send the TxnOffsetCommitRequest. */
2102         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
2103         rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
2104 
2105         /* Set broker 1, 2, and 3 as leaders for a partition each and
2106          * later produce to both partitions so we know there's a connection
2107          * to all brokers. */
2108         rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
2109         rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
2110         rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2);
2111         rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 3);
2112 
2113         /* Broker down is not a test-failing error */
2114         allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
2115         test_curr->is_fatal_cb = error_is_fatal_cb;
2116 
2117         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
2118 
2119         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2120 
2121         for (i = 0 ; i < 3 ; i++) {
2122                 err = rd_kafka_producev(rk,
2123                                         RD_KAFKA_V_TOPIC(topic),
2124                                         RD_KAFKA_V_PARTITION(i),
2125                                         RD_KAFKA_V_VALUE("hi", 2),
2126                                         RD_KAFKA_V_END);
2127                 TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
2128         }
2129 
2130         test_flush(rk, 5000);
2131 
2132         /*
2133          * send_offsets_to_transaction() will query for the group coordinator,
2134          * we need to make those requests slow so that multiple requests are
2135          * sent.
2136          */
2137         for (i = 1 ; i <= 3 ; i++)
2138                 rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 4000);
2139 
2140         /*
2141          * Send offsets to transaction
2142          */
2143 
2144         offsets = rd_kafka_topic_partition_list_new(1);
2145         rd_kafka_topic_partition_list_add(offsets, "srctopic", 3)->
2146                 offset = 12;
2147 
2148         cgmetadata = rd_kafka_consumer_group_metadata_new(groupid);
2149 
2150         error = rd_kafka_send_offsets_to_transaction(rk, offsets,
2151                                                      cgmetadata, -1);
2152 
2153         TEST_SAY("send_offsets_to_transaction() %s\n",
2154                  rd_kafka_error_string(error));
2155         TEST_ASSERT(!error, "send_offsets_to_transaction() failed: %s",
2156                     rd_kafka_error_string(error));
2157 
2158         rd_kafka_consumer_group_metadata_destroy(cgmetadata);
2159         rd_kafka_topic_partition_list_destroy(offsets);
2160 
2161         /* Clear delay */
2162         for (i = 1 ; i <= 3 ; i++)
2163                 rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 0);
2164 
2165         rd_sleep(5);
2166 
2167         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
2168 
2169         /* All done */
2170 
2171         TEST_ASSERT(rd_atomic32_get(&multi_find_req_cnt) > 10000,
2172                     "on_request_sent interceptor did not trigger properly");
2173 
2174         rd_kafka_destroy(rk);
2175 
2176         on_response_received_cb = NULL;
2177 
2178         SUB_TEST_PASS();
2179 }
2180 
2181 
2182 /**
2183  * @brief ESC-4410: adding producer partitions gradually will trigger multiple
2184  *        AddPartitionsToTxn requests. Due to a bug the third partition to be
2185  *        registered would hang in PEND_TXN state.
2186  *
2187  * Trigger this behaviour by having two outstanding AddPartitionsToTxn requests
2188  * at the same time, followed by a need for a third:
2189  *
2190  * 1. Set coordinator broker rtt high (to give us time to produce).
2191  * 2. Produce to partition 0, will trigger first AddPartitionsToTxn.
2192  * 3. Produce to partition 1, will trigger second AddPartitionsToTxn.
2193  * 4. Wait for second AddPartitionsToTxn response.
2194  * 5. Produce to partition 2, should trigger AddPartitionsToTxn, but bug
2195  *    causes it to be stale in pending state.
2196  */
2197 
2198 static rd_atomic32_t multi_addparts_resp_cnt;
2199 static rd_kafka_resp_err_t
multi_addparts_response_received_cb(rd_kafka_t * rk,int sockfd,const char * brokername,int32_t brokerid,int16_t ApiKey,int16_t ApiVersion,int32_t CorrId,size_t size,int64_t rtt,rd_kafka_resp_err_t err,void * ic_opaque)2200 multi_addparts_response_received_cb (rd_kafka_t *rk,
2201                                      int sockfd,
2202                                      const char *brokername,
2203                                      int32_t brokerid,
2204                                      int16_t ApiKey,
2205                                      int16_t ApiVersion,
2206                                      int32_t CorrId,
2207                                      size_t  size,
2208                                      int64_t rtt,
2209                                      rd_kafka_resp_err_t err,
2210                                      void *ic_opaque) {
2211 
2212         if (ApiKey == RD_KAFKAP_AddPartitionsToTxn) {
2213                 TEST_SAY("on_response_received_cb: %s: %s: brokerid %"PRId32
2214                          ", ApiKey %hd, CorrId %d, rtt %.2fms, count %"PRId32
2215                          ": %s\n",
2216                          rd_kafka_name(rk), brokername, brokerid,
2217                          ApiKey, CorrId,
2218                          rtt != -1 ? (float)rtt / 1000.0 : 0.0,
2219                          rd_atomic32_get(&multi_addparts_resp_cnt),
2220                          rd_kafka_err2name(err));
2221 
2222                 rd_atomic32_add(&multi_addparts_resp_cnt, 1);
2223         }
2224 
2225         return RD_KAFKA_RESP_ERR_NO_ERROR;
2226 }
2227 
2228 
do_test_txn_addparts_req_multi(void)2229 static void do_test_txn_addparts_req_multi (void) {
2230         rd_kafka_t *rk;
2231         rd_kafka_mock_cluster_t *mcluster;
2232         const char *txnid = "txnid", *topic = "mytopic";
2233         int32_t txn_coord = 2;
2234 
2235         SUB_TEST();
2236 
2237         rd_atomic32_init(&multi_addparts_resp_cnt, 0);
2238 
2239         on_response_received_cb = multi_addparts_response_received_cb;
2240         rk = create_txn_producer(&mcluster, txnid, 3,
2241                                  "linger.ms", "0",
2242                                  "message.timeout.ms", "9000",
2243                                  /* Set up on_response_received interceptor */
2244                                  "on_response_received", "", NULL);
2245 
2246         /* Let broker 1 be txn coordinator. */
2247         rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
2248                                       txn_coord);
2249 
2250         rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
2251 
2252         /* Set partition leaders to non-txn-coord broker so they wont
2253          * be affected by rtt delay */
2254         rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
2255         rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
2256         rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 1);
2257 
2258 
2259 
2260         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
2261 
2262         /*
2263          * Run one transaction first to let the client familiarize with
2264          * the topic, this avoids metadata lookups, etc, when the real
2265          * test is run.
2266          */
2267         TEST_SAY("Running seed transaction\n");
2268         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2269         TEST_CALL_ERR__(rd_kafka_producev(rk,
2270                                           RD_KAFKA_V_TOPIC(topic),
2271                                           RD_KAFKA_V_VALUE("seed", 4),
2272                                           RD_KAFKA_V_END));
2273         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
2274 
2275 
2276         /*
2277          * Now perform test transaction with rtt delays
2278          */
2279         TEST_SAY("Running test transaction\n");
2280 
2281         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2282 
2283         /* Reset counter */
2284         rd_atomic32_set(&multi_addparts_resp_cnt, 0);
2285 
2286         /* Add latency to txn coordinator so we can pace our produce() calls */
2287         rd_kafka_mock_broker_set_rtt(mcluster, txn_coord, 1000);
2288 
2289         /* Produce to partition 0 */
2290         TEST_CALL_ERR__(rd_kafka_producev(rk,
2291                                           RD_KAFKA_V_TOPIC(topic),
2292                                           RD_KAFKA_V_PARTITION(0),
2293                                           RD_KAFKA_V_VALUE("hi", 2),
2294                                           RD_KAFKA_V_END));
2295 
2296         rd_usleep(500*1000, NULL);
2297 
2298         /* Produce to partition 1 */
2299         TEST_CALL_ERR__(rd_kafka_producev(rk,
2300                                           RD_KAFKA_V_TOPIC(topic),
2301                                           RD_KAFKA_V_PARTITION(1),
2302                                           RD_KAFKA_V_VALUE("hi", 2),
2303                                           RD_KAFKA_V_END));
2304 
2305         TEST_SAY("Waiting for two AddPartitionsToTxnResponse\n");
2306         while (rd_atomic32_get(&multi_addparts_resp_cnt) < 2)
2307                 rd_usleep(10*1000, NULL);
2308 
2309         TEST_SAY("%"PRId32" AddPartitionsToTxnResponses seen\n",
2310                  rd_atomic32_get(&multi_addparts_resp_cnt));
2311 
2312         /* Produce to partition 2, this message will hang in
2313          * queue if the bug is not fixed. */
2314         TEST_CALL_ERR__(rd_kafka_producev(rk,
2315                                           RD_KAFKA_V_TOPIC(topic),
2316                                           RD_KAFKA_V_PARTITION(2),
2317                                           RD_KAFKA_V_VALUE("hi", 2),
2318                                           RD_KAFKA_V_END));
2319 
2320         /* Allow some extra time for things to settle before committing
2321          * transaction. */
2322         rd_usleep(1000*1000, NULL);
2323 
2324         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 10*1000));
2325 
2326         /* All done */
2327         rd_kafka_destroy(rk);
2328 
2329         on_response_received_cb = NULL;
2330 
2331         SUB_TEST_PASS();
2332 }
2333 
2334 
2335 
2336 /**
2337  * @brief Test handling of OffsetFetchRequest returning UNSTABLE_OFFSET_COMMIT.
2338  *
2339  * There are two things to test;
2340  *  - OffsetFetch triggered by committed() (and similar code paths)
2341  *  - OffsetFetch triggered by assign()
2342  */
do_test_unstable_offset_commit(void)2343 static void do_test_unstable_offset_commit (void) {
2344         rd_kafka_t *rk, *c;
2345         rd_kafka_conf_t *c_conf;
2346         rd_kafka_mock_cluster_t *mcluster;
2347         rd_kafka_topic_partition_list_t *offsets;
2348         const char *topic = "mytopic";
2349         const int msgcnt = 100;
2350         const int64_t offset_to_commit = msgcnt / 2;
2351         int i;
2352         int remains = 0;
2353 
2354         SUB_TEST_QUICK();
2355 
2356         rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
2357 
2358         test_conf_init(&c_conf, NULL, 0);
2359         test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
2360         test_conf_set(c_conf, "bootstrap.servers",
2361                       rd_kafka_mock_cluster_bootstraps(mcluster));
2362         test_conf_set(c_conf, "enable.partition.eof", "true");
2363         test_conf_set(c_conf, "auto.offset.reset", "error");
2364         c = test_create_consumer("mygroup", NULL, c_conf, NULL);
2365 
2366         rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
2367 
2368         /* Produce some messages to the topic so that the consumer has
2369          * something to read. */
2370         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
2371         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2372         test_produce_msgs2_nowait(rk, topic, 0, 0, 0, msgcnt,
2373                                   NULL, 0, &remains);
2374         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
2375 
2376 
2377         /* Commit offset */
2378         offsets = rd_kafka_topic_partition_list_new(1);
2379         rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
2380                 offset_to_commit;
2381         TEST_CALL_ERR__(rd_kafka_commit(c, offsets, 0/*sync*/));
2382         rd_kafka_topic_partition_list_destroy(offsets);
2383 
2384         /* Retrieve offsets by calling committed().
2385          *
2386          * Have OffsetFetch fail and retry, on the first iteration
2387          * the API timeout is higher than the amount of time the retries will
2388          * take and thus succeed, and on the second iteration the timeout
2389          * will be lower and thus fail. */
2390         for (i = 0 ; i < 2 ; i++) {
2391                 rd_kafka_resp_err_t err;
2392                 rd_kafka_resp_err_t exp_err = i == 0 ?
2393                         RD_KAFKA_RESP_ERR_NO_ERROR :
2394                         RD_KAFKA_RESP_ERR__TIMED_OUT;
2395                 int timeout_ms = exp_err ? 200 : 5*1000;
2396 
2397                 rd_kafka_mock_push_request_errors(
2398                         mcluster,
2399                         RD_KAFKAP_OffsetFetch,
2400                         1+5,/* first request + some retries */
2401                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2402                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2403                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2404                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2405                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2406                         RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
2407 
2408                 offsets = rd_kafka_topic_partition_list_new(1);
2409                 rd_kafka_topic_partition_list_add(offsets, topic, 0);
2410 
2411                 err = rd_kafka_committed(c, offsets, timeout_ms);
2412 
2413                 TEST_SAY("#%d: committed() returned %s (expected %s)\n",
2414                          i,
2415                          rd_kafka_err2name(err),
2416                          rd_kafka_err2name(exp_err));
2417 
2418                 TEST_ASSERT(err == exp_err,
2419                             "#%d: Expected committed() to return %s, not %s",
2420                             i,
2421                             rd_kafka_err2name(exp_err),
2422                             rd_kafka_err2name(err));
2423                 TEST_ASSERT(offsets->cnt == 1,
2424                             "Expected 1 committed offset, not %d",
2425                             offsets->cnt);
2426                 if (!exp_err)
2427                         TEST_ASSERT(offsets->elems[0].offset == offset_to_commit,
2428                                     "Expected committed offset %"PRId64", "
2429                                     "not %"PRId64,
2430                                     offset_to_commit,
2431                                     offsets->elems[0].offset);
2432                 else
2433                         TEST_ASSERT(offsets->elems[0].offset < 0,
2434                                     "Expected no committed offset, "
2435                                     "not %"PRId64,
2436                                     offsets->elems[0].offset);
2437 
2438                 rd_kafka_topic_partition_list_destroy(offsets);
2439         }
2440 
2441         TEST_SAY("Phase 2: OffsetFetch lookup through assignment\n");
2442         offsets = rd_kafka_topic_partition_list_new(1);
2443         rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
2444                 RD_KAFKA_OFFSET_STORED;
2445 
2446         rd_kafka_mock_push_request_errors(
2447                 mcluster,
2448                 RD_KAFKAP_OffsetFetch,
2449                 1+5,/* first request + some retries */
2450                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2451                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2452                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2453                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2454                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
2455                 RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
2456 
2457         test_consumer_incremental_assign("assign", c, offsets);
2458         rd_kafka_topic_partition_list_destroy(offsets);
2459 
2460         test_consumer_poll_exact("consume", c, 0,
2461                                  1/*eof*/, 0, msgcnt/2,
2462                                  rd_true/*exact counts*/, NULL);
2463 
2464         /* All done */
2465         rd_kafka_destroy(c);
2466         rd_kafka_destroy(rk);
2467 
2468         SUB_TEST_PASS();
2469 }
2470 
2471 
2472 /**
2473  * @brief If a message times out locally before being attempted to send
2474  *        and commit_transaction() is called, the transaction must not succeed.
2475  *        https://github.com/confluentinc/confluent-kafka-dotnet/issues/1568
2476  */
do_test_commit_after_msg_timeout(void)2477 static void do_test_commit_after_msg_timeout (void) {
2478         rd_kafka_t *rk;
2479         rd_kafka_mock_cluster_t *mcluster;
2480         int32_t coord_id, leader_id;
2481         rd_kafka_resp_err_t err;
2482         rd_kafka_error_t *error;
2483         const char *topic = "test";
2484         const char *transactional_id = "txnid";
2485         int remains = 0;
2486 
2487         SUB_TEST_QUICK();
2488 
2489         /* Assign coordinator and leader to two different brokers */
2490         coord_id = 1;
2491         leader_id = 2;
2492 
2493         rk = create_txn_producer(&mcluster, transactional_id, 3,
2494                                  "message.timeout.ms", "5000",
2495                                  "transaction.timeout.ms", "10000",
2496                                  NULL);
2497 
2498         /* Broker down is not a test-failing error */
2499         allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
2500         test_curr->is_fatal_cb = error_is_fatal_cb;
2501         test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
2502 
2503         err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
2504         TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str (err));
2505 
2506         rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
2507                                       coord_id);
2508         rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
2509 
2510         /* Start transactioning */
2511         TEST_SAY("Starting transaction\n");
2512         TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
2513 
2514         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2515 
2516         TEST_SAY("Bringing down %"PRId32"\n", leader_id);
2517         rd_kafka_mock_broker_set_down(mcluster, leader_id);
2518         rd_kafka_mock_broker_set_down(mcluster, coord_id);
2519 
2520         test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
2521 
2522         error = rd_kafka_commit_transaction(rk, -1);
2523         TEST_ASSERT(error != NULL, "expected commit_transaciton() to fail");
2524         TEST_SAY("commit_transaction() failed (as expected): %s\n",
2525                  rd_kafka_error_string(error));
2526         TEST_ASSERT(rd_kafka_error_txn_requires_abort (error),
2527                     "Expected txn_requires_abort error");
2528         rd_kafka_error_destroy(error);
2529 
2530         /* Bring the brokers up so the abort can complete */
2531         rd_kafka_mock_broker_set_up(mcluster, coord_id);
2532         rd_kafka_mock_broker_set_up(mcluster, leader_id);
2533 
2534         TEST_SAY("Aborting transaction\n");
2535         TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
2536 
2537         TEST_ASSERT(remains == 0,
2538                     "%d message(s) were not flushed\n", remains);
2539 
2540         TEST_SAY("Attempting second transaction, which should succeed\n");
2541         allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
2542         test_curr->is_fatal_cb = error_is_fatal_cb;
2543         test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
2544 
2545         TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
2546         test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
2547 
2548         TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
2549 
2550         TEST_ASSERT(remains == 0,
2551                     "%d message(s) were not produced\n", remains);
2552 
2553         rd_kafka_destroy(rk);
2554 
2555         test_curr->is_fatal_cb = NULL;
2556 
2557         SUB_TEST_PASS();
2558 }
2559 
main_0105_transactions_mock(int argc,char ** argv)2560 int main_0105_transactions_mock (int argc, char **argv) {
2561         if (test_needs_auth()) {
2562                 TEST_SKIP("Mock cluster does not support SSL/SASL\n");
2563                 return 0;
2564         }
2565 
2566         do_test_txn_recoverable_errors();
2567 
2568         do_test_txn_fatal_idempo_errors();
2569 
2570         do_test_txn_fenced_reinit();
2571 
2572         do_test_txn_req_cnt();
2573 
2574         do_test_txn_requires_abort_errors();
2575 
2576         do_test_txn_slow_reinit(rd_false);
2577         do_test_txn_slow_reinit(rd_true);
2578 
2579         /* Just do a subset of tests in quick mode */
2580         if (test_quick)
2581                 return 0;
2582 
2583         do_test_txn_endtxn_errors();
2584 
2585         do_test_txn_endtxn_infinite();
2586 
2587         /* Skip tests for non-infinite commit/abort timeouts
2588          * until they're properly handled by the producer. */
2589         if (0)
2590                 do_test_txn_endtxn_timeout();
2591 
2592         /* Bring down the coordinator */
2593         do_test_txn_broker_down_in_txn(rd_true);
2594 
2595         /* Bring down partition leader */
2596         do_test_txn_broker_down_in_txn(rd_false);
2597 
2598         do_test_txns_not_supported();
2599 
2600         do_test_txns_send_offsets_concurrent_is_retried();
2601 
2602         do_test_txn_coord_req_destroy();
2603 
2604         do_test_txn_coord_req_multi_find();
2605 
2606         do_test_txn_addparts_req_multi();
2607 
2608         do_test_txns_no_timeout_crash();
2609 
2610         do_test_txn_auth_failure(
2611                 RD_KAFKAP_InitProducerId,
2612                 RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
2613 
2614         do_test_txn_auth_failure(
2615                 RD_KAFKAP_FindCoordinator,
2616                 RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
2617 
2618         do_test_txn_flush_timeout();
2619 
2620         do_test_unstable_offset_commit();
2621 
2622         do_test_commit_after_msg_timeout();
2623 
2624         do_test_txn_switch_coordinator();
2625 
2626         return 0;
2627 }
2628