1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2019, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include "test.h"
30 
31 #include "rdkafka.h"
32 
33 /**
34  * @name Producer transaction tests
35  *
36  */
37 
38 
39 /**
40  * @brief Produce messages using batch interface.
41  */
do_produce_batch(rd_kafka_t * rk,const char * topic,uint64_t testid,int32_t partition,int msg_base,int cnt)42 void do_produce_batch (rd_kafka_t *rk, const char *topic, uint64_t testid,
43                        int32_t partition, int msg_base, int cnt) {
44         rd_kafka_message_t *messages;
45         rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL);
46         int i;
47         int ret;
48         int remains = cnt;
49 
50         TEST_SAY("Batch-producing %d messages to partition %"PRId32"\n",
51                  cnt, partition);
52 
53         messages = rd_calloc(sizeof(*messages), cnt);
54         for (i = 0 ; i < cnt ; i++) {
55                 char key[128];
56                 char value[128];
57 
58                 test_prepare_msg(testid, partition, msg_base + i,
59                                  value, sizeof(value),
60                                  key, sizeof(key));
61                 messages[i].key = rd_strdup(key);
62                 messages[i].key_len = strlen(key);
63                 messages[i].payload = rd_strdup(value);
64                 messages[i].len = strlen(value);
65                 messages[i]._private = &remains;
66         }
67 
68         ret = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_COPY,
69                                      messages, cnt);
70 
71         rd_kafka_topic_destroy(rkt);
72 
73         TEST_ASSERT(ret == cnt,
74                     "Failed to batch-produce: %d/%d messages produced",
75                     ret, cnt);
76 
77         for (i = 0 ; i < cnt ; i++) {
78                 TEST_ASSERT(!messages[i].err,
79                             "Failed to produce message: %s",
80                             rd_kafka_err2str(messages[i].err));
81                 rd_free(messages[i].key);
82                 rd_free(messages[i].payload);
83         }
84         rd_free(messages);
85 
86         /* Wait for deliveries */
87         test_wait_delivery(rk, &remains);
88 }
89 
90 
91 
92 /**
93  * @brief Basic producer transaction testing without consumed input
94  *        (only consumed output for verification).
95  *        e.g., no consumer offsets to commit with transaction.
96  */
do_test_basic_producer_txn(rd_bool_t enable_compression)97 static void do_test_basic_producer_txn (rd_bool_t enable_compression) {
98         const char *topic = test_mk_topic_name("0103_transactions", 1);
99         const int partition_cnt = 4;
100 #define _TXNCNT 6
101         struct {
102                 const char *desc;
103                 uint64_t testid;
104                 int msgcnt;
105                 rd_bool_t abort;
106                 rd_bool_t sync;
107                 rd_bool_t batch;
108                 rd_bool_t batch_any;
109         } txn[_TXNCNT] = {
110                 { "Commit transaction, sync producing",
111                   0, 100, rd_false, rd_true },
112                 { "Commit transaction, async producing",
113                   0, 1000, rd_false, rd_false },
114                 { "Commit transaction, sync batch producing to any partition",
115                   0, 100, rd_false, rd_true, rd_true, rd_true },
116                 { "Abort transaction, sync producing",
117                   0, 500, rd_true, rd_true },
118                 { "Abort transaction, async producing",
119                   0, 5000, rd_true, rd_false },
120                 { "Abort transaction, sync batch producing to one partition",
121                   0, 500, rd_true, rd_true, rd_true, rd_false },
122 
123         };
124         rd_kafka_t *p, *c;
125         rd_kafka_conf_t *conf, *p_conf, *c_conf;
126         int i;
127 
128         /* Mark one of run modes as quick so we don't run both when
129          * in a hurry.*/
130         SUB_TEST0(enable_compression /* quick */,
131                   "with%s compression", enable_compression ? "" : "out");
132 
133         test_conf_init(&conf, NULL, 30);
134 
135         /* Create producer */
136         p_conf = rd_kafka_conf_dup(conf);
137         rd_kafka_conf_set_dr_msg_cb(p_conf, test_dr_msg_cb);
138         test_conf_set(p_conf, "transactional.id", topic);
139         if (enable_compression)
140                 test_conf_set(p_conf, "compression.type", "lz4");
141         p = test_create_handle(RD_KAFKA_PRODUCER, p_conf);
142 
143         // FIXME: add testing were the txn id is reused (and thus fails)
144 
145         /* Create topic */
146         test_create_topic(p, topic, partition_cnt, 3);
147 
148         /* Create consumer */
149         c_conf = conf;
150         test_conf_set(conf, "auto.offset.reset", "earliest");
151         /* Make sure default isolation.level is transaction aware */
152         TEST_ASSERT(!strcmp(test_conf_get(c_conf, "isolation.level"),
153                             "read_committed"),
154                     "expected isolation.level=read_committed, not %s",
155                     test_conf_get(c_conf, "isolation.level"));
156 
157         c = test_create_consumer(topic, NULL, c_conf, NULL);
158 
159         /* Wait for topic to propagate to avoid test flakyness */
160         test_wait_topic_exists(c, topic, tmout_multip(5000));
161 
162         /* Subscribe to topic */
163         test_consumer_subscribe(c, topic);
164 
165         /* Wait for assignment to make sure consumer is fetching messages
166          * below, so we can use the poll_no_msgs() timeout to
167          * determine that messages were indeed aborted. */
168         test_consumer_wait_assignment(c, rd_true);
169 
170         /* Init transactions */
171         TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30*1000));
172 
173         for (i = 0 ; i < _TXNCNT ; i++) {
174                 int wait_msgcnt = 0;
175 
176                 TEST_SAY(_C_BLU "txn[%d]: Begin transaction: %s\n" _C_CLR,
177                          i, txn[i].desc);
178 
179                 /* Begin a transaction */
180                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
181 
182                 /* If the transaction is aborted it is okay if
183                  * messages fail producing, since they'll be
184                  * purged from queues. */
185                 test_curr->ignore_dr_err = txn[i].abort;
186 
187                 /* Produce messages */
188                 txn[i].testid = test_id_generate();
189                 TEST_SAY("txn[%d]: Produce %d messages %ssynchronously "
190                          "with testid %"PRIu64"\n",
191                          i, txn[i].msgcnt,
192                          txn[i].sync ? "" : "a",
193                          txn[i].testid);
194 
195                 if (!txn[i].batch) {
196                         if (txn[i].sync)
197                                 test_produce_msgs2(p, topic, txn[i].testid,
198                                                    RD_KAFKA_PARTITION_UA, 0,
199                                                    txn[i].msgcnt, NULL, 0);
200                         else
201                                 test_produce_msgs2_nowait(p, topic,
202                                                           txn[i].testid,
203                                                           RD_KAFKA_PARTITION_UA,
204                                                           0,
205                                                           txn[i].msgcnt,
206                                                           NULL, 0,
207                                                           &wait_msgcnt);
208                 } else if (txn[i].batch_any) {
209                         /* Batch: use any partition */
210                         do_produce_batch(p, topic, txn[i].testid,
211                                          RD_KAFKA_PARTITION_UA,
212                                          0, txn[i].msgcnt);
213                 } else {
214                         /* Batch: specific partition */
215                         do_produce_batch(p, topic, txn[i].testid,
216                                          1 /* partition */,
217                                          0, txn[i].msgcnt);
218                 }
219 
220 
221                 /* Abort or commit transaction */
222                 TEST_SAY("txn[%d]: %s" _C_CLR " transaction\n",
223                          i, txn[i].abort ? _C_RED "Abort" : _C_GRN "Commit");
224                 if (txn[i].abort) {
225                         test_curr->ignore_dr_err = rd_true;
226                         TEST_CALL_ERROR__(rd_kafka_abort_transaction(p,
227                                                                      30*1000));
228                 } else {
229                         test_curr->ignore_dr_err = rd_false;
230                         TEST_CALL_ERROR__(rd_kafka_commit_transaction(p,
231                                                                       30*1000));
232                 }
233 
234                 if (!txn[i].sync)
235                         /* Wait for delivery reports */
236                         test_wait_delivery(p, &wait_msgcnt);
237 
238                 /* Consume messages */
239                 if (txn[i].abort)
240                         test_consumer_poll_no_msgs(txn[i].desc, c,
241                                                    txn[i].testid, 3000);
242                 else
243                         test_consumer_poll(txn[i].desc, c,
244                                            txn[i].testid, partition_cnt, 0,
245                                            txn[i].msgcnt, NULL);
246 
247                 TEST_SAY(_C_GRN "txn[%d]: Finished successfully: %s\n" _C_CLR,
248                          i, txn[i].desc);
249         }
250 
251         rd_kafka_destroy(p);
252 
253         test_consumer_close(c);
254         rd_kafka_destroy(c);
255 
256         SUB_TEST_PASS();
257 }
258 
259 
260 /**
261  * @brief Consumes \p cnt messages and returns them in the provided array
262  *        which must be pre-allocated.
263  */
consume_messages(rd_kafka_t * c,rd_kafka_message_t ** msgs,int msgcnt)264 static void consume_messages (rd_kafka_t *c,
265                               rd_kafka_message_t **msgs, int msgcnt) {
266         int i = 0;
267         while (i < msgcnt) {
268                 msgs[i] = rd_kafka_consumer_poll(c, 1000);
269                 if (!msgs[i])
270                         continue;
271 
272                 if (msgs[i]->err) {
273                         TEST_SAY("%s consumer error: %s\n",
274                                  rd_kafka_name(c),
275                                  rd_kafka_message_errstr(msgs[i]));
276                         rd_kafka_message_destroy(msgs[i]);
277                         continue;
278                 }
279 
280                 TEST_SAYL(3, "%s: consumed message %s [%d] @ %"PRId64"\n",
281                           rd_kafka_name(c),
282                           rd_kafka_topic_name(msgs[i]->rkt),
283                           msgs[i]->partition, msgs[i]->offset);
284 
285 
286                 i++;
287         }
288 }
289 
destroy_messages(rd_kafka_message_t ** msgs,int msgcnt)290 static void destroy_messages (rd_kafka_message_t **msgs, int msgcnt) {
291         while (msgcnt-- > 0)
292                 rd_kafka_message_destroy(msgs[msgcnt]);
293 }
294 
295 
296 /**
297  * @brief Test a transactional consumer + transactional producer combo,
298  *        mimicing a streams job.
299  *
300  * One input topic produced to by transactional producer 1,
301  * consumed by transactional consumer 1, which forwards messages
302  * to transactional producer 2 that writes messages to output topic,
303  * which is consumed and verified by transactional consumer 2.
304  *
305  * Every 3rd transaction is aborted.
306  */
do_test_consumer_producer_txn(void)307 void do_test_consumer_producer_txn (void) {
308         char *input_topic =
309                 rd_strdup(test_mk_topic_name("0103-transactions-input", 1));
310         char *output_topic =
311                 rd_strdup(test_mk_topic_name("0103-transactions-output", 1));
312         const char *c1_groupid = input_topic;
313         const char *c2_groupid = output_topic;
314         rd_kafka_t *p1, *p2, *c1, *c2;
315         rd_kafka_conf_t *conf, *tmpconf, *c1_conf;
316         uint64_t testid;
317 #define _MSGCNT (10 * 30)
318         const int txncnt = 10;
319         const int msgcnt = _MSGCNT;
320         int txn;
321         int committed_msgcnt = 0;
322         test_msgver_t expect_mv, actual_mv;
323 
324         SUB_TEST_QUICK("transactional test with %d transactions", txncnt);
325 
326         test_conf_init(&conf, NULL, 30);
327 
328         testid = test_id_generate();
329 
330         /*
331          *
332          * Producer 1
333          *     |
334          *     v
335          * input topic
336          *     |
337          *     v
338          * Consumer 1    }
339          *     |         } transactional streams job
340          *     v         }
341          * Producer 2    }
342          *     |
343          *     v
344          * output tpic
345          *     |
346          *     v
347          * Consumer 2
348          */
349 
350 
351         /* Create Producer 1 and seed input topic */
352         tmpconf = rd_kafka_conf_dup(conf);
353         test_conf_set(tmpconf, "transactional.id", input_topic);
354         rd_kafka_conf_set_dr_msg_cb(tmpconf, test_dr_msg_cb);
355         p1 = test_create_handle(RD_KAFKA_PRODUCER, tmpconf);
356 
357         /* Create input and output topics */
358         test_create_topic(p1, input_topic, 4, 3);
359         test_create_topic(p1, output_topic, 4, 3);
360 
361         /* Seed input topic with messages */
362         TEST_CALL_ERROR__(rd_kafka_init_transactions(p1, 30*1000));
363         TEST_CALL_ERROR__(rd_kafka_begin_transaction(p1));
364         test_produce_msgs2(p1, input_topic, testid, RD_KAFKA_PARTITION_UA,
365                            0, msgcnt, NULL, 0);
366         TEST_CALL_ERROR__(rd_kafka_commit_transaction(p1, 30*1000));
367 
368         rd_kafka_destroy(p1);
369 
370         /* Create Consumer 1: reading msgs from input_topic (Producer 1) */
371         tmpconf = rd_kafka_conf_dup(conf);
372         test_conf_set(tmpconf, "isolation.level", "read_committed");
373         test_conf_set(tmpconf, "auto.offset.reset", "earliest");
374         test_conf_set(tmpconf, "enable.auto.commit", "false");
375         c1_conf = rd_kafka_conf_dup(tmpconf);
376         c1 = test_create_consumer(c1_groupid, NULL, tmpconf, NULL);
377         test_consumer_subscribe(c1, input_topic);
378 
379         /* Create Producer 2 */
380         tmpconf = rd_kafka_conf_dup(conf);
381         test_conf_set(tmpconf, "transactional.id", output_topic);
382         rd_kafka_conf_set_dr_msg_cb(tmpconf, test_dr_msg_cb);
383         p2 = test_create_handle(RD_KAFKA_PRODUCER, tmpconf);
384         TEST_CALL_ERROR__(rd_kafka_init_transactions(p2, 30*1000));
385 
386         /* Create Consumer 2: reading msgs from output_topic (Producer 2) */
387         tmpconf = rd_kafka_conf_dup(conf);
388         test_conf_set(tmpconf, "isolation.level", "read_committed");
389         test_conf_set(tmpconf, "auto.offset.reset", "earliest");
390         c2 = test_create_consumer(c2_groupid, NULL, tmpconf, NULL);
391         test_consumer_subscribe(c2, output_topic);
392 
393         rd_kafka_conf_destroy(conf);
394 
395         /* Keep track of what messages to expect on the output topic */
396         test_msgver_init(&expect_mv, testid);
397 
398         for (txn = 0 ; txn < txncnt ; txn++) {
399                 int msgcnt2 = 10 * (1 + (txn % 3));
400                 rd_kafka_message_t *msgs[_MSGCNT];
401                 int i;
402                 rd_bool_t do_abort = !(txn % 3);
403                 rd_bool_t recreate_consumer = do_abort && txn == 3;
404                 rd_kafka_topic_partition_list_t *offsets;
405                 rd_kafka_resp_err_t err;
406                 rd_kafka_consumer_group_metadata_t *c1_cgmetadata;
407                 int remains = msgcnt2;
408 
409                 TEST_SAY(_C_BLU "Begin transaction #%d/%d "
410                          "(msgcnt=%d, do_abort=%s, recreate_consumer=%s)\n",
411                          txn, txncnt, msgcnt2,
412                          do_abort ? "true":"false",
413                          recreate_consumer ? "true":"false");
414 
415                 consume_messages(c1, msgs, msgcnt2);
416 
417                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p2));
418 
419                 for (i = 0 ; i < msgcnt2 ; i++) {
420                         rd_kafka_message_t *msg = msgs[i];
421 
422                         if (!do_abort) {
423                                 /* The expected msgver based on the input topic
424                                  * will be compared to the actual msgver based
425                                  * on the output topic, so we need to
426                                  * override the topic name to match
427                                  * the actual msgver's output topic. */
428                                 test_msgver_add_msg0(__FUNCTION__, __LINE__,
429                                                      rd_kafka_name(p2),
430                                                      &expect_mv, msg,
431                                                      output_topic);
432                                 committed_msgcnt++;
433                         }
434 
435                         err = rd_kafka_producev(p2,
436                                                 RD_KAFKA_V_TOPIC(output_topic),
437                                                 RD_KAFKA_V_KEY(msg->key,
438                                                                msg->key_len),
439                                                 RD_KAFKA_V_VALUE(msg->payload,
440                                                                  msg->len),
441                                                 RD_KAFKA_V_MSGFLAGS(
442                                                         RD_KAFKA_MSG_F_COPY),
443                                                 RD_KAFKA_V_OPAQUE(&remains),
444                                                 RD_KAFKA_V_END);
445                         TEST_ASSERT(!err, "produce failed: %s",
446                                     rd_kafka_err2str(err));
447 
448                         rd_kafka_poll(p2, 0);
449                 }
450 
451                 destroy_messages(msgs, msgcnt2);
452 
453                 err = rd_kafka_assignment(c1, &offsets);
454                 TEST_ASSERT(!err, "failed to get consumer assignment: %s",
455                             rd_kafka_err2str(err));
456 
457                 err = rd_kafka_position(c1, offsets);
458                 TEST_ASSERT(!err, "failed to get consumer position: %s",
459                             rd_kafka_err2str(err));
460 
461                 c1_cgmetadata = rd_kafka_consumer_group_metadata(c1);
462                 TEST_ASSERT(c1_cgmetadata != NULL,
463                             "failed to get consumer group metadata");
464 
465                 TEST_CALL_ERROR__(
466                         rd_kafka_send_offsets_to_transaction(
467                                 p2, offsets, c1_cgmetadata, -1));
468 
469 
470                 rd_kafka_consumer_group_metadata_destroy(c1_cgmetadata);
471 
472                 rd_kafka_topic_partition_list_destroy(offsets);
473 
474 
475                 if (do_abort) {
476                         test_curr->ignore_dr_err = rd_true;
477                         TEST_CALL_ERROR__(rd_kafka_abort_transaction(
478                                                   p2, 30*1000));
479                 } else {
480                         test_curr->ignore_dr_err = rd_false;
481                         TEST_CALL_ERROR__(rd_kafka_commit_transaction(
482                                                   p2, 30*1000));
483                 }
484 
485                 TEST_ASSERT(remains == 0,
486                             "expected no remaining messages "
487                             "in-flight/in-queue, got %d", remains);
488 
489 
490                 if (recreate_consumer) {
491                         /* Recreate the consumer to pick up
492                          * on the committed offset. */
493                         TEST_SAY("Recreating consumer 1\n");
494                         rd_kafka_consumer_close(c1);
495                         rd_kafka_destroy(c1);
496 
497                         c1 = test_create_consumer(c1_groupid, NULL, c1_conf,
498                                                   NULL);
499                         test_consumer_subscribe(c1, input_topic);
500                 }
501         }
502 
503         test_msgver_init(&actual_mv, testid);
504 
505         test_consumer_poll("Verify output topic", c2, testid,
506                            -1, 0, committed_msgcnt, &actual_mv);
507 
508         test_msgver_verify_compare("Verify output topic",
509                                    &actual_mv, &expect_mv,
510                                    TEST_MSGVER_ALL);
511 
512         test_msgver_clear(&actual_mv);
513         test_msgver_clear(&expect_mv);
514 
515         rd_kafka_consumer_close(c1);
516         rd_kafka_consumer_close(c2);
517         rd_kafka_destroy(c1);
518         rd_kafka_destroy(c2);
519         rd_kafka_destroy(p2);
520 
521         rd_free(input_topic);
522         rd_free(output_topic);
523 
524         SUB_TEST_PASS();
525 }
526 
527 
528 /**
529  * @brief Testing misuse of the transaction API.
530  */
do_test_misuse_txn(void)531 static void do_test_misuse_txn (void) {
532         const char *topic = test_mk_topic_name("0103-test_misuse_txn", 1);
533         rd_kafka_t *p;
534         rd_kafka_conf_t *conf;
535         rd_kafka_error_t *error;
536         rd_kafka_resp_err_t fatal_err;
537         char errstr[512];
538         int i;
539 
540         /*
541          * transaction.timeout.ms out of range (from broker's point of view)
542          */
543         SUB_TEST_QUICK();
544 
545         test_conf_init(&conf, NULL, 10);
546 
547         test_conf_set(conf, "transactional.id", topic);
548         test_conf_set(conf, "transaction.timeout.ms", "2147483647");
549 
550         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
551 
552         error = rd_kafka_init_transactions(p, 10*1000);
553         TEST_ASSERT(error, "Expected init_transactions() to fail");
554         TEST_ASSERT(rd_kafka_error_code(error) ==
555                     RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
556                     "Expected error ERR_INVALID_TRANSACTION_TIMEOUT, "
557                     "not %s: %s",
558                     rd_kafka_error_name(error),
559                     error ? rd_kafka_error_string(error) : "");
560         TEST_ASSERT(rd_kafka_error_is_fatal(error),
561                     "Expected error to have is_fatal() set");
562         rd_kafka_error_destroy(error);
563         /* Check that a fatal error is raised */
564         fatal_err = rd_kafka_fatal_error(p, errstr, sizeof(errstr));
565         TEST_ASSERT(fatal_err == RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT,
566                     "Expected fatal error ERR_INVALID_TRANSACTION_TIMEOUT, "
567                     "not %s: %s",
568                     rd_kafka_err2name(fatal_err),
569                     fatal_err ? errstr : "");
570 
571         rd_kafka_destroy(p);
572 
573 
574         /*
575          * Multiple calls to init_transactions(): finish on first.
576          */
577         TEST_SAY("[ Test multiple init_transactions(): finish on first ]\n");
578         test_conf_init(&conf, NULL, 10);
579 
580         test_conf_set(conf, "transactional.id", topic);
581 
582         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
583 
584         TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30*1000));
585 
586         error = rd_kafka_init_transactions(p, 1);
587         TEST_ASSERT(error, "Expected init_transactions() to fail");
588         TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE,
589                     "Expected ERR__STATE error, not %s",
590                     rd_kafka_error_name(error));
591         rd_kafka_error_destroy(error);
592 
593         TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
594 
595         error = rd_kafka_init_transactions(p, 3*1000);
596         TEST_ASSERT(error, "Expected init_transactions() to fail");
597         TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE,
598                     "Expected ERR__STATE error, not %s",
599                     rd_kafka_error_name(error));
600         rd_kafka_error_destroy(error);
601 
602         rd_kafka_destroy(p);
603 
604 
605         /*
606          * Multiple calls to init_transactions(): timeout on first.
607          */
608         TEST_SAY("[ Test multiple init_transactions(): timeout on first ]\n");
609         test_conf_init(&conf, NULL, 10);
610 
611         test_conf_set(conf, "transactional.id", topic);
612 
613         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
614 
615         error = rd_kafka_init_transactions(p, 1);
616         TEST_ASSERT(error, "Expected init_transactions() to fail");
617         TEST_SAY("error: %s, %d\n", rd_kafka_error_string(error),
618                  rd_kafka_error_is_retriable(error));
619         TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
620                     "Expected ERR__TIMED_OUT, not %s: %s",
621                     rd_kafka_error_name(error),
622                     rd_kafka_error_string(error));
623         TEST_ASSERT(rd_kafka_error_is_retriable(error),
624                     "Expected error to be retriable");
625         rd_kafka_error_destroy(error);
626 
627         TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30*1000));
628 
629         rd_kafka_destroy(p);
630 
631 
632         /*
633          * Multiple calls to init_transactions(): hysterical amounts
634          */
635         TEST_SAY("[ Test multiple init_transactions(): hysterical amounts ]\n");
636         test_conf_init(&conf, NULL, 10);
637 
638         test_conf_set(conf, "transactional.id", topic);
639 
640         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
641 
642         /* Call until init succeeds */
643         for (i = 0 ; i < 5000 ; i++) {
644                 if (!(error = rd_kafka_init_transactions(p, 1)))
645                         break;
646 
647                 TEST_ASSERT(rd_kafka_error_is_retriable(error),
648                             "Expected error to be retriable");
649                 rd_kafka_error_destroy(error);
650 
651                 error = rd_kafka_begin_transaction(p);
652                 TEST_ASSERT(error, "Expected begin_transactions() to fail");
653                 TEST_ASSERT(rd_kafka_error_code(error) ==
654                             RD_KAFKA_RESP_ERR__STATE,
655                             "Expected begin_transactions() to fail "
656                             "with STATE, not %s",
657                             rd_kafka_error_name(error));
658 
659                 rd_kafka_error_destroy(error);
660         }
661 
662         TEST_SAY("init_transactions() succeeded after %d call(s)\n", i+1);
663 
664         /* Make sure a sub-sequent init call fails. */
665         error = rd_kafka_init_transactions(p, 5*1000);
666         TEST_ASSERT(error, "Expected init_transactions() to fail");
667         TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE,
668                     "Expected init_transactions() to fail with STATE, not %s",
669                     rd_kafka_error_name(error));
670         rd_kafka_error_destroy(error);
671 
672         /* But begin.. should work now */
673         TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
674 
675         rd_kafka_destroy(p);
676 
677         SUB_TEST_PASS();
678 }
679 
680 
681 /**
682  * @brief is_fatal_cb for fenced_txn test.
683  */
fenced_txn_is_fatal_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * reason)684 static int fenced_txn_is_fatal_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
685                                    const char *reason) {
686         TEST_SAY("is_fatal?: %s: %s\n", rd_kafka_err2str(err), reason);
687         if (err == RD_KAFKA_RESP_ERR__FENCED) {
688                 TEST_SAY("Saw the expected fatal error\n");
689                 return 0;
690         }
691         return 1;
692 }
693 
694 
695 /**
696  * @brief Check that transaction fencing is handled correctly.
697  */
do_test_fenced_txn(rd_bool_t produce_after_fence)698 static void do_test_fenced_txn (rd_bool_t produce_after_fence) {
699         const char *topic = test_mk_topic_name("0103_fenced_txn", 1);
700         rd_kafka_conf_t *conf;
701         rd_kafka_t *p1, *p2;
702         rd_kafka_error_t *error;
703         uint64_t testid;
704 
705         SUB_TEST_QUICK("%sproduce after fence",
706                        produce_after_fence ? "" : "do not ");
707 
708         if (produce_after_fence)
709                 test_curr->is_fatal_cb = fenced_txn_is_fatal_cb;
710 
711         test_curr->ignore_dr_err = rd_false;
712 
713         testid = test_id_generate();
714 
715         test_conf_init(&conf, NULL, 30);
716 
717         test_conf_set(conf, "transactional.id", topic);
718         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
719 
720         p1 = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf));
721         p2 = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf));
722         rd_kafka_conf_destroy(conf);
723 
724         TEST_CALL_ERROR__(rd_kafka_init_transactions(p1, 30*1000));
725 
726         /* Begin a transaction */
727         TEST_CALL_ERROR__(rd_kafka_begin_transaction(p1));
728 
729         /* Produce some messages */
730         test_produce_msgs2(p1, topic, testid, RD_KAFKA_PARTITION_UA,
731                            0, 10, NULL, 0);
732 
733         /* Initialize transactions on producer 2, this should
734          * fence off producer 1. */
735         TEST_CALL_ERROR__(rd_kafka_init_transactions(p2, 30*1000));
736 
737         if (produce_after_fence) {
738                 /* This will fail hard since the epoch was bumped. */
739                 TEST_SAY("Producing after producing fencing\n");
740                 test_curr->ignore_dr_err = rd_true;
741                 test_produce_msgs2(p1, topic, testid, RD_KAFKA_PARTITION_UA,
742                                    0, 10, NULL, 0);
743         }
744 
745 
746         error = rd_kafka_commit_transaction(p1, 30*1000);
747 
748         TEST_ASSERT(error, "Expected commit to fail");
749         TEST_ASSERT(rd_kafka_fatal_error(p1, NULL, 0),
750                     "Expected a fatal error to have been raised");
751         TEST_ASSERT(error, "Expected commit_transaction() to fail");
752         TEST_ASSERT(rd_kafka_error_is_fatal(error),
753                     "Expected commit_transaction() to return a "
754                     "fatal error");
755         TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error),
756                     "Expected commit_transaction() not to return an "
757                     "abortable error");
758         TEST_ASSERT(!rd_kafka_error_is_retriable(error),
759                     "Expected commit_transaction() not to return a "
760                     "retriable error");
761         TEST_ASSERT(rd_kafka_error_code(error) ==
762                     RD_KAFKA_RESP_ERR__FENCED,
763                     "Expected commit_transaction() to return %s, "
764                     "not %s: %s",
765                     rd_kafka_err2name(RD_KAFKA_RESP_ERR__FENCED),
766                     rd_kafka_error_name(error),
767                     rd_kafka_error_string(error));
768         rd_kafka_error_destroy(error);
769 
770         rd_kafka_destroy(p1);
771         rd_kafka_destroy(p2);
772 
773         /* Make sure no messages were committed. */
774         test_consume_txn_msgs_easy(topic, topic, testid,
775                                    test_get_partition_count(NULL, topic,
776                                                             10*1000),
777                                    0, NULL);
778 
779         SUB_TEST_PASS();
780 }
781 
782 
783 
784 /**
785  * @brief Check that fatal idempotent producer errors are also fatal
786  *        transactional errors when KIP-360 is not supported.
787  */
do_test_fatal_idempo_error_without_kip360(void)788 static void do_test_fatal_idempo_error_without_kip360 (void) {
789         const char *topic = test_mk_topic_name("0103_fatal_idempo", 1);
790         const int32_t partition = 0;
791         rd_kafka_conf_t *conf, *c_conf;
792         rd_kafka_t *p, *c;
793         rd_kafka_error_t *error;
794         uint64_t testid;
795         const int msgcnt[3] = { 6, 4, 1 };
796         rd_kafka_topic_partition_list_t *records;
797         test_msgver_t expect_mv, actual_mv;
798         /* This test triggers UNKNOWN_PRODUCER_ID on AK <2.4 and >2.4, but
799          * not on AK 2.4.
800          * On AK <2.5 (pre KIP-360) these errors are unrecoverable,
801          * on AK >2.5 (with KIP-360) we can recover.
802          * Since 2.4 is not behaving as the other releases we skip it here. */
803         rd_bool_t expect_fail = test_broker_version < TEST_BRKVER(2,5,0,0);
804 
805         SUB_TEST_QUICK("%s",
806                        expect_fail ?
807                        "expecting failure since broker is < 2.5" :
808                        "not expecting failure since broker is >= 2.5");
809 
810         if (test_broker_version >= TEST_BRKVER(2,4,0,0) &&
811             test_broker_version < TEST_BRKVER(2,5,0,0))
812                 SUB_TEST_SKIP("can't trigger UNKNOWN_PRODUCER_ID on AK 2.4");
813 
814         if (expect_fail)
815                 test_curr->is_fatal_cb = test_error_is_not_fatal_cb;
816         test_curr->ignore_dr_err = expect_fail;
817 
818         testid = test_id_generate();
819 
820         /* Keep track of what messages to expect on the output topic */
821         test_msgver_init(&expect_mv, testid);
822 
823         test_conf_init(&conf, NULL, 30);
824 
825         test_conf_set(conf, "transactional.id", topic);
826         test_conf_set(conf, "batch.num.messages", "1");
827         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
828 
829         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
830 
831         test_create_topic(p, topic, 1, 3);
832 
833 
834         TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30*1000));
835 
836         /*
837          * 3 transactions:
838          *  1. Produce some messages, commit.
839          *  2. Produce some messages, then delete the messages from txn 1 and
840          *     then produce some more messages: UNKNOWN_PRODUCER_ID should be
841          *     raised as a fatal error.
842          *  3. Start a new transaction, produce and commit some new messages.
843          *     (this step is only performed when expect_fail is false).
844          */
845 
846         /*
847          * Transaction 1
848          */
849         TEST_SAY(_C_BLU "Transaction 1: %d msgs\n", msgcnt[0]);
850         TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
851         test_produce_msgs2(p, topic, testid, partition, 0,
852                            msgcnt[0], NULL, 0);
853         TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));
854 
855 
856         /*
857          * Transaction 2
858          */
859         TEST_SAY(_C_BLU "Transaction 2: %d msgs\n", msgcnt[1]);
860         TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
861 
862         /* Now delete the messages from txn1 */
863         TEST_SAY("Deleting records < %s [%"PRId32"] offset %d+1\n",
864                  topic, partition, msgcnt[0]);
865         records = rd_kafka_topic_partition_list_new(1);
866         rd_kafka_topic_partition_list_add(records, topic, partition)->offset =
867                 msgcnt[0]; /* include the control message too */
868 
869         TEST_CALL_ERR__(test_DeleteRecords_simple(p,
870                                                   NULL,
871                                                   records,
872                                                   NULL));
873         rd_kafka_topic_partition_list_destroy(records);
874 
875         /* Wait for deletes to propagate */
876         rd_sleep(2);
877 
878         if (!expect_fail)
879                 test_curr->dr_mv = &expect_mv;
880 
881         /* Produce more messages, should now fail */
882         test_produce_msgs2(p, topic, testid, partition, 0,
883                            msgcnt[1], NULL, 0);
884 
885         error = rd_kafka_commit_transaction(p, -1);
886 
887         TEST_SAY_ERROR(error, "commit_transaction() returned: ");
888 
889         if (expect_fail) {
890                 TEST_ASSERT(error != NULL,
891                             "Expected transaction to fail");
892                 TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
893                             "Expected abortable error");
894                 rd_kafka_error_destroy(error);
895 
896                 /* Now abort transaction, which should raise the fatal error
897                  * since it is the abort that performs the PID reinitialization.
898                  */
899                 error = rd_kafka_abort_transaction(p, -1);
900                 TEST_SAY_ERROR(error, "abort_transaction() returned: ");
901                 TEST_ASSERT(error != NULL,
902                             "Expected abort to fail");
903                 TEST_ASSERT(rd_kafka_error_is_fatal(error),
904                             "Expecting fatal error");
905                 TEST_ASSERT(!rd_kafka_error_is_retriable(error),
906                             "Did not expect retriable error");
907                 TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error),
908                             "Did not expect abortable error");
909 
910                 rd_kafka_error_destroy(error);
911 
912         } else {
913                 TEST_ASSERT(!error, "Did not expect commit to fail: %s",
914                             rd_kafka_error_string(error));
915         }
916 
917 
918         if (!expect_fail) {
919                 /*
920                  * Transaction 3
921                  */
922                 TEST_SAY(_C_BLU "Transaction 3: %d msgs\n", msgcnt[2]);
923                 test_curr->dr_mv = &expect_mv;
924                 TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
925                 test_produce_msgs2(p, topic, testid, partition, 0,
926                                    msgcnt[2], NULL, 0);
927                 TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));
928         }
929 
930         rd_kafka_destroy(p);
931 
932         /* Consume messages.
933          * On AK<2.5 (expect_fail=true) we do not expect to see any messages
934          * since the producer will have failed with a fatal error.
935          * On AK>=2.5 (expect_fail=false) we should only see messages from
936          * txn 3 which are sent after the producer has recovered.
937          */
938 
939         test_conf_init(&c_conf, NULL, 0);
940         test_conf_set(c_conf, "enable.partition.eof", "true");
941         c = test_create_consumer(topic, NULL, c_conf, NULL);
942         test_consumer_assign_partition("consume",
943                                        c, topic, partition,
944                                        RD_KAFKA_OFFSET_BEGINNING);
945 
946         test_msgver_init(&actual_mv, testid);
947         test_msgver_ignore_eof(&actual_mv);
948 
949         test_consumer_poll("Verify output topic", c, testid,
950                            1, 0, -1, &actual_mv);
951 
952         test_msgver_verify_compare("Verify output topic",
953                                    &actual_mv, &expect_mv,
954                                    TEST_MSGVER_ALL);
955 
956         test_msgver_clear(&actual_mv);
957         test_msgver_clear(&expect_mv);
958 
959         rd_kafka_destroy(c);
960 
961         SUB_TEST_PASS();
962 }
963 
964 
965 /**
966  * @brief Check that empty transactions, with no messages produced, work
967  *        as expected.
968  */
do_test_empty_txn(rd_bool_t send_offsets,rd_bool_t do_commit)969 static void do_test_empty_txn (rd_bool_t send_offsets, rd_bool_t do_commit) {
970         const char *topic = test_mk_topic_name("0103_empty_txn", 1);
971         rd_kafka_conf_t *conf, *c_conf;
972         rd_kafka_t *p, *c;
973         uint64_t testid;
974         const int msgcnt = 10;
975         rd_kafka_topic_partition_list_t *committed;
976         int64_t offset;
977 
978         SUB_TEST_QUICK("%ssend offsets, %s",
979                        send_offsets ? "" : "don't ",
980                        do_commit ? "commit" : "abort");
981 
982         testid = test_id_generate();
983 
984         test_conf_init(&conf, NULL, 30);
985         c_conf = rd_kafka_conf_dup(conf);
986 
987         test_conf_set(conf, "transactional.id", topic);
988         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
989         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
990 
991         test_create_topic(p, topic, 1, 3);
992 
993         /* Produce some non-txnn messages for the consumer to read and commit */
994         test_produce_msgs_easy(topic, testid, 0, msgcnt);
995 
996         /* Create consumer and subscribe to the topic */
997         test_conf_set(c_conf, "auto.offset.reset", "earliest");
998         test_conf_set(c_conf, "enable.auto.commit", "false");
999         c = test_create_consumer(topic, NULL, c_conf, NULL);
1000         test_consumer_subscribe(c, topic);
1001         test_consumer_wait_assignment(c, rd_false);
1002 
1003         TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1));
1004 
1005         TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
1006 
1007         /* send_offsets? Consume messages and send those offsets to the txn */
1008         if (send_offsets) {
1009                 rd_kafka_topic_partition_list_t *offsets;
1010                 rd_kafka_consumer_group_metadata_t *cgmetadata;
1011 
1012                 test_consumer_poll("consume", c, testid, -1, 0, msgcnt, NULL);
1013 
1014                 TEST_CALL_ERR__(rd_kafka_assignment(c, &offsets));
1015                 TEST_CALL_ERR__(rd_kafka_position(c, offsets));
1016 
1017                 cgmetadata = rd_kafka_consumer_group_metadata(c);
1018                 TEST_ASSERT(cgmetadata != NULL,
1019                             "failed to get consumer group metadata");
1020 
1021                 TEST_CALL_ERROR__(
1022                         rd_kafka_send_offsets_to_transaction(
1023                                 p, offsets, cgmetadata, -1));
1024 
1025                 rd_kafka_consumer_group_metadata_destroy(cgmetadata);
1026 
1027                 rd_kafka_topic_partition_list_destroy(offsets);
1028         }
1029 
1030 
1031         if (do_commit)
1032                 TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));
1033         else
1034                 TEST_CALL_ERROR__(rd_kafka_abort_transaction(p, -1));
1035 
1036         /* Get the committed offsets */
1037         TEST_CALL_ERR__(rd_kafka_assignment(c, &committed));
1038         TEST_CALL_ERR__(rd_kafka_committed(c, committed, 10*1000));
1039 
1040         TEST_ASSERT(committed->cnt == 1,
1041                     "expected one committed offset, not %d",
1042                     committed->cnt);
1043         offset = committed->elems[0].offset;
1044         TEST_SAY("Committed offset is %"PRId64"\n", offset);
1045 
1046         if (do_commit && send_offsets)
1047                 TEST_ASSERT(offset >= msgcnt,
1048                             "expected committed offset >= %d, got %"PRId64,
1049                             msgcnt, offset);
1050         else
1051                 TEST_ASSERT(offset < 0,
1052                             "expected no committed offset, got %"PRId64,
1053                             offset);
1054 
1055         rd_kafka_topic_partition_list_destroy(committed);
1056 
1057         rd_kafka_destroy(c);
1058         rd_kafka_destroy(p);
1059 
1060         SUB_TEST_PASS();
1061 }
1062 
1063 /**
1064  * @returns the high watermark for the given partition.
1065  */
query_hi_wmark0(int line,rd_kafka_t * c,const char * topic,int32_t partition)1066 int64_t query_hi_wmark0 (int line,
1067                          rd_kafka_t *c, const char *topic, int32_t partition) {
1068         rd_kafka_resp_err_t err;
1069         int64_t lo = -1, hi = -1;
1070 
1071         err = rd_kafka_query_watermark_offsets(c, topic, partition, &lo, &hi,
1072                                                tmout_multip(5*1000));
1073         TEST_ASSERT(!err,
1074                     "%d: query_watermark_offsets(%s) failed: %s",
1075                     line, topic, rd_kafka_err2str(err));
1076 
1077         return hi;
1078 }
1079 #define query_hi_wmark(c,topic,part) query_hi_wmark0(__LINE__,c,topic,part)
1080 
1081 /**
1082  * @brief Check that isolation.level works as expected for query_watermark..().
1083  */
do_test_wmark_isolation_level(void)1084 static void do_test_wmark_isolation_level (void) {
1085         const char *topic = test_mk_topic_name("0103_wmark_isol", 1);
1086         rd_kafka_conf_t *conf, *c_conf;
1087         rd_kafka_t *p, *c1, *c2;
1088         uint64_t testid;
1089         int64_t hw_uncommitted, hw_committed;
1090 
1091         SUB_TEST_QUICK();
1092 
1093         testid = test_id_generate();
1094 
1095         test_conf_init(&conf, NULL, 30);
1096         c_conf = rd_kafka_conf_dup(conf);
1097 
1098         test_conf_set(conf, "transactional.id", topic);
1099         rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
1100         p = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf));
1101 
1102         test_create_topic(p, topic, 1, 3);
1103 
1104         /* Produce some non-txn messages to avoid 0 as the committed hwmark */
1105         test_produce_msgs_easy(topic, testid, 0, 100);
1106 
1107         /* Create consumer and subscribe to the topic */
1108         test_conf_set(c_conf, "isolation.level", "read_committed");
1109         c1 = test_create_consumer(topic, NULL, rd_kafka_conf_dup(c_conf), NULL);
1110         test_conf_set(c_conf, "isolation.level", "read_uncommitted");
1111         c2 = test_create_consumer(topic, NULL, c_conf, NULL);
1112 
1113         TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1));
1114 
1115         TEST_CALL_ERROR__(rd_kafka_begin_transaction(p));
1116 
1117         /* Produce some txn messages */
1118         test_produce_msgs2(p, topic, testid, 0, 0, 100, NULL, 0);
1119 
1120         test_flush(p, 10*1000);
1121 
1122         hw_committed = query_hi_wmark(c1, topic, 0);
1123         hw_uncommitted = query_hi_wmark(c2, topic, 0);
1124 
1125         TEST_SAY("Pre-commit hwmarks: committed %"PRId64
1126                  ", uncommitted %"PRId64"\n",
1127                  hw_committed, hw_uncommitted);
1128 
1129         TEST_ASSERT(hw_committed > 0 && hw_committed < hw_uncommitted,
1130                     "Committed hwmark %"PRId64" should be lower than "
1131                     "uncommitted hwmark %"PRId64" for %s [0]",
1132                     hw_committed, hw_uncommitted, topic);
1133 
1134         TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1));
1135 
1136         /* Re-create the producer and re-init transactions to make
1137          * sure the transaction is fully committed in the cluster. */
1138         rd_kafka_destroy(p);
1139         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
1140         TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1));
1141         rd_kafka_destroy(p);
1142 
1143 
1144         /* Now query wmarks again */
1145         hw_committed = query_hi_wmark(c1, topic, 0);
1146         hw_uncommitted = query_hi_wmark(c2, topic, 0);
1147 
1148         TEST_SAY("Post-commit hwmarks: committed %"PRId64
1149                  ", uncommitted %"PRId64"\n",
1150                  hw_committed, hw_uncommitted);
1151 
1152         TEST_ASSERT(hw_committed == hw_uncommitted,
1153                     "Committed hwmark %"PRId64" should be equal to "
1154                     "uncommitted hwmark %"PRId64" for %s [0]",
1155                     hw_committed, hw_uncommitted, topic);
1156 
1157         rd_kafka_destroy(c1);
1158         rd_kafka_destroy(c2);
1159 
1160         SUB_TEST_PASS();
1161 }
1162 
1163 
1164 
main_0103_transactions(int argc,char ** argv)1165 int main_0103_transactions (int argc, char **argv) {
1166 
1167         do_test_misuse_txn();
1168         do_test_basic_producer_txn(rd_false /* without compression */);
1169         do_test_basic_producer_txn(rd_true /* with compression */);
1170         do_test_consumer_producer_txn();
1171         do_test_fenced_txn(rd_false /* no produce after fencing */);
1172         do_test_fenced_txn(rd_true /* produce after fencing */);
1173         do_test_fatal_idempo_error_without_kip360();
1174         do_test_empty_txn(rd_false/*don't send offsets*/, rd_true/*commit*/);
1175         do_test_empty_txn(rd_false/*don't send offsets*/, rd_false/*abort*/);
1176         do_test_empty_txn(rd_true/*send offsets*/, rd_true/*commit*/);
1177         do_test_empty_txn(rd_true/*send offsets*/, rd_false/*abort*/);
1178         do_test_wmark_isolation_level();
1179         return 0;
1180 }
1181 
1182 
1183 
1184 /**
1185  * @brief Transaction tests that don't require a broker.
1186  */
do_test_txn_local(void)1187 static void do_test_txn_local (void) {
1188         rd_kafka_conf_t *conf;
1189         rd_kafka_t *p;
1190         rd_kafka_error_t *error;
1191         test_timing_t t_init;
1192         int timeout_ms = 7 * 1000;
1193 
1194         SUB_TEST_QUICK();
1195 
1196         /*
1197          * No transactional.id, init_transactions() should fail.
1198          */
1199         test_conf_init(&conf, NULL, 0);
1200         test_conf_set(conf, "bootstrap.servers", NULL);
1201 
1202         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
1203 
1204         error = rd_kafka_init_transactions(p, 10);
1205         TEST_ASSERT(error, "Expected init_transactions() to fail");
1206         TEST_ASSERT(rd_kafka_error_code(error) ==
1207                     RD_KAFKA_RESP_ERR__NOT_CONFIGURED,
1208                     "Expected ERR__NOT_CONFIGURED, not %s",
1209                     rd_kafka_error_name(error));
1210         rd_kafka_error_destroy(error);
1211 
1212         rd_kafka_destroy(p);
1213 
1214 
1215         /*
1216          * No brokers, init_transactions() should time out according
1217          * to the timeout.
1218          */
1219         test_conf_init(&conf, NULL, 0);
1220         test_conf_set(conf, "bootstrap.servers", NULL);
1221         test_conf_set(conf, "transactional.id", "test");
1222         p = test_create_handle(RD_KAFKA_PRODUCER, conf);
1223 
1224         TEST_SAY("Waiting for init_transactions() timeout %d ms\n",
1225                  timeout_ms);
1226 
1227         test_timeout_set((timeout_ms + 2000) / 1000);
1228 
1229         TIMING_START(&t_init, "init_transactions()");
1230         error = rd_kafka_init_transactions(p, timeout_ms);
1231         TIMING_STOP(&t_init);
1232         TEST_ASSERT(error, "Expected init_transactions() to fail");
1233         TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
1234                     "Expected RD_KAFKA_RESP_ERR__TIMED_OUT, "
1235                     "not %s: %s",
1236                     rd_kafka_error_name(error),
1237                     rd_kafka_error_string(error));
1238 
1239         TEST_SAY("init_transactions() failed as expected: %s\n",
1240                  rd_kafka_error_string(error));
1241 
1242         rd_kafka_error_destroy(error);
1243 
1244         TIMING_ASSERT(&t_init, timeout_ms - 2000, timeout_ms + 5000);
1245 
1246         rd_kafka_destroy(p);
1247 
1248         SUB_TEST_PASS();
1249 }
1250 
1251 
main_0103_transactions_local(int argc,char ** argv)1252 int main_0103_transactions_local (int argc, char **argv) {
1253 
1254         do_test_txn_local();
1255 
1256         return 0;
1257 }
1258