1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2019 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 /**
30  * @name Transaction Manager
31  *
32  */
33 
34 #include <stdarg.h>
35 
36 #include "rd.h"
37 #include "rdkafka_int.h"
38 #include "rdkafka_txnmgr.h"
39 #include "rdkafka_idempotence.h"
40 #include "rdkafka_request.h"
41 #include "rdkafka_error.h"
42 #include "rdunittest.h"
43 #include "rdrand.h"
44 
45 
46 static void
47 rd_kafka_txn_curr_api_reply_error (rd_kafka_q_t *rkq, rd_kafka_error_t *error);
48 
49 
50 /**
51  * @brief Ensure client is configured as a transactional producer,
52  *        else return error.
53  *
54  * @locality application thread
55  * @locks none
56  */
57 static RD_INLINE rd_kafka_error_t *
rd_kafka_ensure_transactional(const rd_kafka_t * rk)58 rd_kafka_ensure_transactional (const rd_kafka_t *rk) {
59         if (unlikely(rk->rk_type != RD_KAFKA_PRODUCER))
60                 return rd_kafka_error_new(
61                         RD_KAFKA_RESP_ERR__INVALID_ARG,
62                         "The Transactional API can only be used "
63                         "on producer instances");
64 
65         if (unlikely(!rk->rk_conf.eos.transactional_id))
66                 return rd_kafka_error_new(
67                         RD_KAFKA_RESP_ERR__NOT_CONFIGURED,
68                         "The Transactional API requires "
69                         "transactional.id to be configured");
70 
71         return NULL;
72 }
73 
74 
75 
76 /**
77  * @brief Ensure transaction state is one of \p states.
78  *
79  * @param the required states, ended by a -1 sentinel.
80  *
81  * @locks rd_kafka_*lock(rk) MUST be held
82  * @locality any
83  */
84 static RD_INLINE rd_kafka_error_t *
rd_kafka_txn_require_states0(rd_kafka_t * rk,rd_kafka_txn_state_t states[])85 rd_kafka_txn_require_states0 (rd_kafka_t *rk,
86                               rd_kafka_txn_state_t states[]) {
87         rd_kafka_error_t *error;
88         size_t i;
89 
90         if (unlikely((error = rd_kafka_ensure_transactional(rk)) != NULL))
91                 return error;
92 
93         for (i = 0 ; (int)states[i] != -1 ; i++)
94                 if (rk->rk_eos.txn_state == states[i])
95                         return NULL;
96 
97         error = rd_kafka_error_new(
98                 RD_KAFKA_RESP_ERR__STATE,
99                 "Operation not valid in state %s",
100                 rd_kafka_txn_state2str(rk->rk_eos.txn_state));
101 
102 
103         if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_FATAL_ERROR)
104                 rd_kafka_error_set_fatal(error);
105         else if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR)
106                 rd_kafka_error_set_txn_requires_abort(error);
107 
108         return error;
109 }
110 
111 /** @brief \p ... is a list of states */
112 #define rd_kafka_txn_require_state(rk,...)                              \
113         rd_kafka_txn_require_states0(rk,                                \
114                                      (rd_kafka_txn_state_t[]){          \
115                                                      __VA_ARGS__, -1 })
116 
117 
118 
119 /**
120  * @param ignore Will be set to true if the state transition should be
121  *               completely ignored.
122  * @returns true if the state transition is valid, else false.
123  */
124 static rd_bool_t
rd_kafka_txn_state_transition_is_valid(rd_kafka_txn_state_t curr,rd_kafka_txn_state_t new_state,rd_bool_t * ignore)125 rd_kafka_txn_state_transition_is_valid (rd_kafka_txn_state_t curr,
126                                         rd_kafka_txn_state_t new_state,
127                                         rd_bool_t *ignore) {
128 
129         *ignore = rd_false;
130 
131         switch (new_state)
132         {
133         case RD_KAFKA_TXN_STATE_INIT:
134                 /* This is the initialized value and this transition will
135                  * never happen. */
136                 return rd_false;
137 
138         case RD_KAFKA_TXN_STATE_WAIT_PID:
139                 return curr == RD_KAFKA_TXN_STATE_INIT;
140 
141         case RD_KAFKA_TXN_STATE_READY_NOT_ACKED:
142                 return curr == RD_KAFKA_TXN_STATE_WAIT_PID;
143 
144         case RD_KAFKA_TXN_STATE_READY:
145                 return curr == RD_KAFKA_TXN_STATE_READY_NOT_ACKED ||
146                         curr == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION ||
147                         curr == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION;
148 
149         case RD_KAFKA_TXN_STATE_IN_TRANSACTION:
150                 return curr == RD_KAFKA_TXN_STATE_READY;
151 
152         case RD_KAFKA_TXN_STATE_BEGIN_COMMIT:
153                 return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION;
154 
155         case RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION:
156                 return curr == RD_KAFKA_TXN_STATE_BEGIN_COMMIT;
157 
158         case RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION:
159                 return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION ||
160                         curr == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR;
161 
162         case RD_KAFKA_TXN_STATE_ABORTABLE_ERROR:
163                 if (curr == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION ||
164                     curr == RD_KAFKA_TXN_STATE_FATAL_ERROR) {
165                         /* Ignore sub-sequent abortable errors in
166                          * these states. */
167                         *ignore = rd_true;
168                         return 1;
169                 }
170 
171                 return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION ||
172                         curr == RD_KAFKA_TXN_STATE_BEGIN_COMMIT ||
173                         curr == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION;
174 
175         case RD_KAFKA_TXN_STATE_FATAL_ERROR:
176                 /* Any state can transition to a fatal error */
177                 return rd_true;
178 
179         default:
180                 RD_NOTREACHED();
181                 return rd_false;
182         }
183 }
184 
185 
186 /**
187  * @brief Transition the transaction state to \p new_state.
188  *
189  * @returns 0 on success or an error code if the state transition
190  *          was invalid.
191  *
192  * @locality rdkafka main thread
193  * @locks rd_kafka_wrlock MUST be held
194  */
rd_kafka_txn_set_state(rd_kafka_t * rk,rd_kafka_txn_state_t new_state)195 static void rd_kafka_txn_set_state (rd_kafka_t *rk,
196                                     rd_kafka_txn_state_t new_state) {
197         rd_bool_t ignore;
198 
199         if (rk->rk_eos.txn_state == new_state)
200                 return;
201 
202         /* Check if state transition is valid */
203         if (!rd_kafka_txn_state_transition_is_valid(rk->rk_eos.txn_state,
204                                                     new_state, &ignore)) {
205                 rd_kafka_log(rk, LOG_CRIT, "TXNSTATE",
206                              "BUG: Invalid transaction state transition "
207                              "attempted: %s -> %s",
208                              rd_kafka_txn_state2str(rk->rk_eos.txn_state),
209                              rd_kafka_txn_state2str(new_state));
210 
211                 rd_assert(!*"BUG: Invalid transaction state transition");
212         }
213 
214         if (ignore) {
215                 /* Ignore this state change */
216                 return;
217         }
218 
219         rd_kafka_dbg(rk, EOS, "TXNSTATE",
220                      "Transaction state change %s -> %s",
221                      rd_kafka_txn_state2str(rk->rk_eos.txn_state),
222                      rd_kafka_txn_state2str(new_state));
223 
224         /* If transitioning from IN_TRANSACTION, the app is no longer
225          * allowed to enqueue (produce) messages. */
226         if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION)
227                 rd_atomic32_set(&rk->rk_eos.txn_may_enq, 0);
228         else if (new_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION)
229                 rd_atomic32_set(&rk->rk_eos.txn_may_enq, 1);
230 
231         rk->rk_eos.txn_state = new_state;
232 }
233 
234 
235 /**
236  * @brief An unrecoverable transactional error has occurred.
237  *
238  * @param do_lock RD_DO_LOCK: rd_kafka_wrlock(rk) will be acquired and released,
239  *                RD_DONT_LOCK: rd_kafka_wrlock(rk) MUST be held by the caller.
240  * @locality any
241  * @locks rd_kafka_wrlock MUST NOT be held
242  */
rd_kafka_txn_set_fatal_error(rd_kafka_t * rk,rd_dolock_t do_lock,rd_kafka_resp_err_t err,const char * fmt,...)243 void rd_kafka_txn_set_fatal_error (rd_kafka_t *rk, rd_dolock_t do_lock,
244                                    rd_kafka_resp_err_t err,
245                                    const char *fmt, ...) {
246         char errstr[512];
247         va_list ap;
248 
249         va_start(ap, fmt);
250         vsnprintf(errstr, sizeof(errstr), fmt, ap);
251         va_end(ap);
252 
253         rd_kafka_log(rk, LOG_ALERT, "TXNERR",
254                      "Fatal transaction error: %s (%s)",
255                      errstr, rd_kafka_err2name(err));
256 
257         if (do_lock)
258                 rd_kafka_wrlock(rk);
259         rd_kafka_set_fatal_error0(rk, RD_DONT_LOCK, err, "%s", errstr);
260 
261         rk->rk_eos.txn_err = err;
262         if (rk->rk_eos.txn_errstr)
263                 rd_free(rk->rk_eos.txn_errstr);
264         rk->rk_eos.txn_errstr = rd_strdup(errstr);
265 
266         if (rk->rk_eos.txn_init_rkq) {
267                 /* If application has called init_transactions() and
268                  * it has now failed, reply to the app. */
269                 rd_kafka_txn_curr_api_reply_error(
270                         rk->rk_eos.txn_init_rkq,
271                         rd_kafka_error_new_fatal(err, "%s", errstr));
272                 rk->rk_eos.txn_init_rkq = NULL;
273         }
274 
275         rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_FATAL_ERROR);
276 
277         if (do_lock)
278                 rd_kafka_wrunlock(rk);
279 }
280 
281 
282 /**
283  * @brief An abortable/recoverable transactional error has occured.
284  *
285  * @locality rdkafka main thread
286  * @locks rd_kafka_wrlock MUST NOT be held
287  */
rd_kafka_txn_set_abortable_error(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * fmt,...)288 void rd_kafka_txn_set_abortable_error (rd_kafka_t *rk,
289                                        rd_kafka_resp_err_t err,
290                                        const char *fmt, ...) {
291         char errstr[512];
292         va_list ap;
293 
294         if (rd_kafka_fatal_error(rk, NULL, 0)) {
295                 rd_kafka_dbg(rk, EOS, "FATAL",
296                              "Not propagating abortable transactional "
297                              "error (%s) "
298                              "since previous fatal error already raised",
299                              rd_kafka_err2name(err));
300                 return;
301         }
302 
303         va_start(ap, fmt);
304         vsnprintf(errstr, sizeof(errstr), fmt, ap);
305         va_end(ap);
306 
307         rd_kafka_wrlock(rk);
308         if (rk->rk_eos.txn_err) {
309                 rd_kafka_dbg(rk, EOS, "TXNERR",
310                              "Ignoring sub-sequent abortable transaction "
311                              "error: %s (%s): "
312                              "previous error (%s) already raised",
313                              errstr,
314                              rd_kafka_err2name(err),
315                              rd_kafka_err2name(rk->rk_eos.txn_err));
316                 rd_kafka_wrunlock(rk);
317                 return;
318         }
319 
320         rk->rk_eos.txn_err = err;
321         if (rk->rk_eos.txn_errstr)
322                 rd_free(rk->rk_eos.txn_errstr);
323         rk->rk_eos.txn_errstr = rd_strdup(errstr);
324 
325         rd_kafka_log(rk, LOG_ERR, "TXNERR",
326                      "Current transaction failed: %s (%s)",
327                      errstr, rd_kafka_err2name(err));
328 
329         rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORTABLE_ERROR);
330         rd_kafka_wrunlock(rk);
331 
332         /* Purge all messages in queue/flight */
333         rd_kafka_purge(rk,
334                        RD_KAFKA_PURGE_F_QUEUE |
335                        RD_KAFKA_PURGE_F_ABORT_TXN |
336                        RD_KAFKA_PURGE_F_NON_BLOCKING);
337 
338 }
339 
340 
341 
342 /**
343  * @brief Send op reply to the application which is blocking
344  *        on one of the transaction APIs and reset the current API.
345  *
346  * @param rkq is the queue to send the reply on, which may be NULL or disabled.
347  *            The \p rkq refcount is decreased by this function.
348  * @param err API error code.
349  * @param errstr_fmt If err is set, a human readable error format string.
350  *
351  * @locality rdkafka main thread
352  * @locks any
353  */
354 static void
rd_kafka_txn_curr_api_reply_error(rd_kafka_q_t * rkq,rd_kafka_error_t * error)355 rd_kafka_txn_curr_api_reply_error (rd_kafka_q_t *rkq, rd_kafka_error_t *error) {
356         rd_kafka_op_t *rko;
357 
358         if (!rkq) {
359                 if (error)
360                         rd_kafka_error_destroy(error);
361                 return;
362         }
363 
364         rko = rd_kafka_op_new(RD_KAFKA_OP_TXN|RD_KAFKA_OP_REPLY);
365 
366         if (error) {
367                 rko->rko_u.txn.error = error;
368                 rko->rko_err = rd_kafka_error_code(error);
369         }
370 
371         rd_kafka_q_enq(rkq, rko);
372 
373         rd_kafka_q_destroy(rkq);
374 }
375 
376 /**
377  * @brief Wrapper for rd_kafka_txn_curr_api_reply_error() that takes
378  *        an error code and format string.
379  *
380  * @param rkq is the queue to send the reply on, which may be NULL or disabled.
381  *            The \p rkq refcount is decreased by this function.
382  * @param err API error code.
383  * @param errstr_fmt If err is set, a human readable error format string.
384  *
385  * @locality rdkafka main thread
386  * @locks any
387  */
388 static void
rd_kafka_txn_curr_api_reply(rd_kafka_q_t * rkq,rd_kafka_resp_err_t err,const char * errstr_fmt,...)389 rd_kafka_txn_curr_api_reply (rd_kafka_q_t *rkq,
390                              rd_kafka_resp_err_t err,
391                              const char *errstr_fmt, ...) {
392         rd_kafka_error_t *error = NULL;
393 
394         if (err) {
395                 va_list ap;
396                 va_start(ap, errstr_fmt);
397                 error = rd_kafka_error_new_v(err, errstr_fmt, ap);
398                 va_end(ap);
399         }
400 
401         rd_kafka_txn_curr_api_reply_error(rkq, error);
402 }
403 
404 
405 
406 /**
407  * @brief The underlying idempotent producer state changed,
408  *        see if this affects the transactional operations.
409  *
410  * @locality any thread
411  * @locks rd_kafka_wrlock(rk) MUST be held
412  */
rd_kafka_txn_idemp_state_change(rd_kafka_t * rk,rd_kafka_idemp_state_t idemp_state)413 void rd_kafka_txn_idemp_state_change (rd_kafka_t *rk,
414                                       rd_kafka_idemp_state_t idemp_state) {
415 
416         if (idemp_state == RD_KAFKA_IDEMP_STATE_ASSIGNED &&
417             rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_WAIT_PID) {
418                 RD_UT_COVERAGE(1);
419                 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY_NOT_ACKED);
420 
421                 if (rk->rk_eos.txn_init_rkq) {
422                         /* Application has called init_transactions() and
423                          * it is now complete, reply to the app. */
424                         rd_kafka_txn_curr_api_reply(rk->rk_eos.txn_init_rkq,
425                                                     RD_KAFKA_RESP_ERR_NO_ERROR,
426                                                     NULL);
427                         rk->rk_eos.txn_init_rkq = NULL;
428                 }
429 
430         } else if (idemp_state == RD_KAFKA_IDEMP_STATE_FATAL_ERROR &&
431                    rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_FATAL_ERROR) {
432                 /* A fatal error has been raised. */
433 
434                 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_FATAL_ERROR);
435 
436                 if (rk->rk_eos.txn_init_rkq) {
437                         /* Application has called init_transactions() and
438                          * it has now failed, reply to the app. */
439                         rd_kafka_txn_curr_api_reply_error(
440                                 rk->rk_eos.txn_init_rkq,
441                                 rd_kafka_error_new_fatal(
442                                         rk->rk_eos.txn_err ?
443                                         rk->rk_eos.txn_err :
444                                         RD_KAFKA_RESP_ERR__FATAL,
445                                         "Fatal error raised by "
446                                         "idempotent producer while "
447                                         "retrieving PID: %s",
448                                         rk->rk_eos.txn_errstr ?
449                                         rk->rk_eos.txn_errstr :
450                                         "see previous logs"));
451                         rk->rk_eos.txn_init_rkq = NULL;
452                 }
453         }
454 }
455 
456 
457 /**
458  * @brief Moves a partition from the pending list to the proper list.
459  *
460  * @locality rdkafka main thread
461  * @locks none
462  */
rd_kafka_txn_partition_registered(rd_kafka_toppar_t * rktp)463 static void rd_kafka_txn_partition_registered (rd_kafka_toppar_t *rktp) {
464         rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
465 
466         rd_kafka_toppar_lock(rktp);
467 
468         if (unlikely(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_PEND_TXN))) {
469                 rd_kafka_dbg(rk, EOS|RD_KAFKA_DBG_PROTOCOL,
470                              "ADDPARTS",
471                              "\"%.*s\" [%"PRId32"] is not in pending "
472                              "list but returned in AddPartitionsToTxn "
473                              "response: ignoring",
474                              RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
475                              rktp->rktp_partition);
476                 rd_kafka_toppar_unlock(rktp);
477                 return;
478         }
479 
480         rd_kafka_dbg(rk, EOS|RD_KAFKA_DBG_TOPIC, "ADDPARTS",
481                      "%.*s [%"PRId32"] registered with transaction",
482                      RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
483                      rktp->rktp_partition);
484 
485         rd_assert((rktp->rktp_flags & (RD_KAFKA_TOPPAR_F_PEND_TXN|
486                                        RD_KAFKA_TOPPAR_F_IN_TXN)) ==
487                   RD_KAFKA_TOPPAR_F_PEND_TXN);
488 
489         rktp->rktp_flags = (rktp->rktp_flags & ~RD_KAFKA_TOPPAR_F_PEND_TXN) |
490                 RD_KAFKA_TOPPAR_F_IN_TXN;
491 
492         rd_kafka_toppar_unlock(rktp);
493 
494         mtx_lock(&rk->rk_eos.txn_pending_lock);
495         TAILQ_REMOVE(&rk->rk_eos.txn_waitresp_rktps, rktp, rktp_txnlink);
496         mtx_unlock(&rk->rk_eos.txn_pending_lock);
497 
498         TAILQ_INSERT_TAIL(&rk->rk_eos.txn_rktps, rktp, rktp_txnlink);
499 }
500 
501 
502 
503 /**
504  * @brief Handle AddPartitionsToTxnResponse
505  *
506  * @locality rdkafka main thread
507  * @locks none
508  */
rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)509 static void rd_kafka_txn_handle_AddPartitionsToTxn (rd_kafka_t *rk,
510                                                     rd_kafka_broker_t *rkb,
511                                                     rd_kafka_resp_err_t err,
512                                                     rd_kafka_buf_t *rkbuf,
513                                                     rd_kafka_buf_t *request,
514                                                     void *opaque) {
515         const int log_decode_errors = LOG_ERR;
516         int32_t TopicCnt;
517         int okcnt = 0, errcnt = 0;
518         int actions = 0;
519         int retry_backoff_ms = 500; /* retry backoff */
520         rd_kafka_resp_err_t reset_coord_err = RD_KAFKA_RESP_ERR_NO_ERROR;
521 
522         if (err)
523                 goto done;
524 
525         rd_kafka_rdlock(rk);
526         rd_assert(rk->rk_eos.txn_state !=
527                   RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION);
528 
529         if (rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_IN_TRANSACTION &&
530             rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_BEGIN_COMMIT) {
531                 /* Response received after aborting transaction */
532                 rd_rkb_dbg(rkb, EOS, "ADDPARTS",
533                            "Ignoring outdated AddPartitionsToTxn response in "
534                            "state %s",
535                            rd_kafka_txn_state2str(rk->rk_eos.txn_state));
536                 rd_kafka_rdunlock(rk);
537                 err = RD_KAFKA_RESP_ERR__OUTDATED;
538                 goto done;
539         }
540         rd_kafka_rdunlock(rk);
541 
542         rd_kafka_buf_read_throttle_time(rkbuf);
543 
544         rd_kafka_buf_read_i32(rkbuf, &TopicCnt);
545 
546         while (TopicCnt-- > 0) {
547                 rd_kafkap_str_t Topic;
548                 rd_kafka_itopic_t *rkt;
549                 int32_t PartCnt;
550                 int p_actions = 0;
551 
552                 rd_kafka_buf_read_str(rkbuf, &Topic);
553                 rd_kafka_buf_read_i32(rkbuf, &PartCnt);
554 
555                 rkt = rd_kafka_topic_find0(rk, &Topic);
556                 if (rkt)
557                         rd_kafka_topic_rdlock(rkt); /* for toppar_get() */
558 
559                 while (PartCnt-- > 0) {
560                         shptr_rd_kafka_toppar_t *s_rktp = NULL;
561                         rd_kafka_toppar_t *rktp;
562                         int32_t Partition;
563                         int16_t ErrorCode;
564 
565                         rd_kafka_buf_read_i32(rkbuf, &Partition);
566                         rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
567 
568                         if (rkt)
569                                 s_rktp = rd_kafka_toppar_get(rkt,
570                                                              Partition,
571                                                              rd_false);
572 
573                         if (!s_rktp) {
574                                 rd_rkb_dbg(rkb, EOS|RD_KAFKA_DBG_PROTOCOL,
575                                            "ADDPARTS",
576                                            "Unknown partition \"%.*s\" "
577                                            "[%"PRId32"] in AddPartitionsToTxn "
578                                            "response: ignoring",
579                                            RD_KAFKAP_STR_PR(&Topic),
580                                            Partition);
581                                 continue;
582                         }
583 
584                         rktp = rd_kafka_toppar_s2i(s_rktp);
585 
586                         switch (ErrorCode)
587                         {
588                         case RD_KAFKA_RESP_ERR_NO_ERROR:
589                                 /* Move rktp from pending to proper list */
590                                 rd_kafka_txn_partition_registered(rktp);
591                                 break;
592 
593                         case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
594                         case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
595                         case RD_KAFKA_RESP_ERR__TRANSPORT:
596                                 reset_coord_err = ErrorCode;
597                                 p_actions |= RD_KAFKA_ERR_ACTION_RETRY;
598                                 break;
599 
600                         case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS:
601                                 retry_backoff_ms = 20;
602                                 /* FALLTHRU */
603                         case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS:
604                         case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
605                                 p_actions |= RD_KAFKA_ERR_ACTION_RETRY;
606                                 break;
607 
608                         case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
609                         case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING:
610                         case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
611                         case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
612                                 p_actions |= RD_KAFKA_ERR_ACTION_FATAL;
613                                 err = ErrorCode;
614                                 break;
615 
616                         case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED:
617                                 p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
618                                 err = ErrorCode;
619                                 break;
620 
621                         case RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED:
622                                 /* Partition skipped due to other partition's
623                                  * errors */
624                                 break;
625 
626                         default:
627                                 /* Unhandled error, fail transaction */
628                                 p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
629                                 break;
630                         }
631 
632                         if (ErrorCode) {
633                                 errcnt++;
634                                 actions |= p_actions;
635 
636                                 if (!(p_actions &
637                                       (RD_KAFKA_ERR_ACTION_FATAL |
638                                        RD_KAFKA_ERR_ACTION_PERMANENT)))
639                                         rd_rkb_dbg(
640                                                 rkb, EOS,
641                                                 "ADDPARTS",
642                                                 "AddPartitionsToTxn response: "
643                                                 "partition \"%.*s\": "
644                                                 "[%"PRId32"]: %s",
645                                                 RD_KAFKAP_STR_PR(&Topic),
646                                                 Partition,
647                                                 rd_kafka_err2str(
648                                                         ErrorCode));
649                                 else
650                                         rd_rkb_log(rkb, LOG_ERR,
651                                                    "ADDPARTS",
652                                                    "Failed to add partition "
653                                                    "\"%.*s\" [%"PRId32"] to "
654                                                    "transaction: %s",
655                                                    RD_KAFKAP_STR_PR(&Topic),
656                                                    Partition,
657                                                    rd_kafka_err2str(
658                                                            ErrorCode));
659                         } else {
660                                 okcnt++;
661                         }
662 
663                         rd_kafka_toppar_destroy(s_rktp);
664                 }
665 
666                 if (rkt) {
667                         rd_kafka_topic_rdunlock(rkt);
668                         rd_kafka_topic_destroy0(rkt);
669                 }
670         }
671 
672         if (actions) /* Actions set from encountered errors '*/
673                 goto done;
674 
675         /* Since these partitions are now allowed to produce
676          * we wake up all broker threads. */
677         rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT);
678 
679         goto done;
680 
681  err_parse:
682         err = rkbuf->rkbuf_err;
683 
684  done:
685         if (err)
686                 rk->rk_eos.txn_req_cnt--;
687 
688         if (err == RD_KAFKA_RESP_ERR__DESTROY ||
689             err == RD_KAFKA_RESP_ERR__OUTDATED)
690                 return;
691 
692         if (reset_coord_err) {
693                 rd_kafka_wrlock(rk);
694                 rd_kafka_txn_coord_set(rk, NULL,
695                                        "AddPartitionsToTxn failed: %s",
696                                        rd_kafka_err2str(reset_coord_err));
697                 rd_kafka_wrunlock(rk);
698         }
699 
700 
701         mtx_lock(&rk->rk_eos.txn_pending_lock);
702         TAILQ_CONCAT(&rk->rk_eos.txn_pending_rktps,
703                      &rk->rk_eos.txn_waitresp_rktps,
704                      rktp_txnlink);
705         mtx_unlock(&rk->rk_eos.txn_pending_lock);
706 
707         if (okcnt + errcnt == 0) {
708                 /* Shouldn't happen */
709                 rd_kafka_dbg(rk, EOS, "ADDPARTS",
710                              "No known partitions in "
711                              "AddPartitionsToTxn response");
712         }
713 
714         if (actions & RD_KAFKA_ERR_ACTION_FATAL) {
715                 rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err,
716                                              "Failed to add partitions to "
717                                              "transaction: %s",
718                                              rd_kafka_err2str(err));
719 
720         } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
721                 rd_kafka_txn_schedule_register_partitions(rk, retry_backoff_ms);
722 
723         } else if (errcnt > 0) {
724                 /* Treat all other errors as abortable errors */
725                 rd_kafka_txn_set_abortable_error(
726                         rk, err,
727                         "Failed to add %d/%d partition(s) to transaction "
728                         "on broker %s: %s (after %d ms)",
729                         errcnt, errcnt + okcnt,
730                         rd_kafka_broker_name(rkb),
731                         rd_kafka_err2str(err),
732                         (int)(rkbuf->rkbuf_ts_sent/1000));
733         }
734 }
735 
736 
737 /**
738  * @brief Send AddPartitionsToTxnRequest to the transaction coordinator.
739  *
740  * @returns an error code if the transaction coordinator is not known
741  *          or not available.
742  *
743  * @locality rdkafka main thread
744  * @locks none
745  */
rd_kafka_txn_register_partitions(rd_kafka_t * rk)746 static rd_kafka_resp_err_t rd_kafka_txn_register_partitions (rd_kafka_t *rk) {
747         char errstr[512];
748         rd_kafka_resp_err_t err;
749         rd_kafka_error_t *error;
750         rd_kafka_pid_t pid;
751 
752         mtx_lock(&rk->rk_eos.txn_pending_lock);
753         if (TAILQ_EMPTY(&rk->rk_eos.txn_pending_rktps)) {
754                 mtx_unlock(&rk->rk_eos.txn_pending_lock);
755                 return RD_KAFKA_RESP_ERR_NO_ERROR;
756         }
757 
758         error = rd_kafka_txn_require_state(rk,
759                                            RD_KAFKA_TXN_STATE_IN_TRANSACTION,
760                                            RD_KAFKA_TXN_STATE_BEGIN_COMMIT);
761         if (error) {
762                 err = rd_kafka_error_to_legacy(error, errstr, sizeof(errstr));
763                 goto err;
764         }
765 
766         pid = rd_kafka_idemp_get_pid0(rk, rd_false/*dont-lock*/);
767         if (!rd_kafka_pid_valid(pid)) {
768                 rd_dassert(!*"BUG: No PID despite proper transaction state");
769                 err = RD_KAFKA_RESP_ERR__STATE;
770                 rd_snprintf(errstr, sizeof(errstr),
771                             "No PID available (idempotence state %s)",
772                             rd_kafka_idemp_state2str(rk->rk_eos.idemp_state));
773                 goto err;
774         }
775 
776         if (!rd_kafka_broker_is_up(rk->rk_eos.txn_coord)) {
777                 err = RD_KAFKA_RESP_ERR__TRANSPORT;
778                 rd_snprintf(errstr, sizeof(errstr), "Broker is not up");
779                 goto err;
780         }
781 
782 
783         /* Send request to coordinator */
784         err = rd_kafka_AddPartitionsToTxnRequest(
785                 rk->rk_eos.txn_coord,
786                 rk->rk_conf.eos.transactional_id,
787                 pid,
788                 &rk->rk_eos.txn_pending_rktps,
789                 errstr, sizeof(errstr),
790                 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
791                 rd_kafka_txn_handle_AddPartitionsToTxn, NULL);
792         if (err)
793                 goto err;
794 
795         TAILQ_CONCAT(&rk->rk_eos.txn_waitresp_rktps,
796                      &rk->rk_eos.txn_pending_rktps,
797                      rktp_txnlink);
798 
799         mtx_unlock(&rk->rk_eos.txn_pending_lock);
800 
801         rk->rk_eos.txn_req_cnt++;
802 
803         rd_rkb_dbg(rk->rk_eos.txn_coord, EOS, "ADDPARTS",
804                    "Adding partitions to transaction");
805 
806         return RD_KAFKA_RESP_ERR_NO_ERROR;
807 
808  err:
809         mtx_unlock(&rk->rk_eos.txn_pending_lock);
810 
811         rd_kafka_dbg(rk, EOS, "ADDPARTS",
812                      "Unable to register partitions with transaction: "
813                      "%s", errstr);
814         return err;
815 }
816 
rd_kafka_txn_register_partitions_tmr_cb(rd_kafka_timers_t * rkts,void * arg)817 static void rd_kafka_txn_register_partitions_tmr_cb (rd_kafka_timers_t *rkts,
818                                                      void *arg) {
819         rd_kafka_t *rk = arg;
820 
821         rd_kafka_txn_register_partitions(rk);
822 }
823 
824 
825 /**
826  * @brief Schedule register_partitions() as soon as possible.
827  *
828  * @locality any
829  * @locks any
830  */
rd_kafka_txn_schedule_register_partitions(rd_kafka_t * rk,int backoff_ms)831 void rd_kafka_txn_schedule_register_partitions (rd_kafka_t *rk,
832                                                 int backoff_ms) {
833         rd_kafka_timer_start_oneshot(
834                 &rk->rk_timers,
835                 &rk->rk_eos.txn_register_parts_tmr, rd_false/*dont-restart*/,
836                 backoff_ms ? backoff_ms * 1000 : 1 /* immediate */,
837                 rd_kafka_txn_register_partitions_tmr_cb,
838                 rk);
839 }
840 
841 
842 
843 /**
844  * @brief Clears \p flag from all rktps in \p tqh
845  */
rd_kafka_txn_clear_partitions_flag(rd_kafka_toppar_tqhead_t * tqh,int flag)846 static void rd_kafka_txn_clear_partitions_flag (rd_kafka_toppar_tqhead_t *tqh,
847                                                 int flag) {
848         rd_kafka_toppar_t *rktp;
849 
850         TAILQ_FOREACH(rktp, tqh, rktp_txnlink) {
851                 rd_kafka_toppar_lock(rktp);
852                 rd_dassert(rktp->rktp_flags & flag);
853                 rktp->rktp_flags &= ~flag;
854                 rd_kafka_toppar_unlock(rktp);
855         }
856 }
857 
858 
859 /**
860  * @brief Clear all pending partitions.
861  *
862  * @locks txn_pending_lock MUST be held
863  */
rd_kafka_txn_clear_pending_partitions(rd_kafka_t * rk)864 static void rd_kafka_txn_clear_pending_partitions (rd_kafka_t *rk) {
865         rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_pending_rktps,
866                                            RD_KAFKA_TOPPAR_F_PEND_TXN);
867         rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_waitresp_rktps,
868                                            RD_KAFKA_TOPPAR_F_PEND_TXN);
869         TAILQ_INIT(&rk->rk_eos.txn_pending_rktps);
870         TAILQ_INIT(&rk->rk_eos.txn_waitresp_rktps);
871 }
872 
873 /**
874  * @brief Clear all added partitions.
875  *
876  * @locks rd_kafka_wrlock(rk) MUST be held
877  */
rd_kafka_txn_clear_partitions(rd_kafka_t * rk)878 static void rd_kafka_txn_clear_partitions (rd_kafka_t *rk) {
879         rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_rktps,
880                                            RD_KAFKA_TOPPAR_F_IN_TXN);
881         TAILQ_INIT(&rk->rk_eos.txn_rktps);
882 }
883 
884 
885 
886 
887 /**
888  * @brief Op timeout callback which fails the current transaction.
889  *
890  * @locality rdkafka main thread
891  * @locks none
892  */
893 static void
rd_kafka_txn_curr_api_abort_timeout_cb(rd_kafka_timers_t * rkts,void * arg)894 rd_kafka_txn_curr_api_abort_timeout_cb (rd_kafka_timers_t *rkts, void *arg) {
895         rd_kafka_q_t *rkq = arg;
896 
897         rd_kafka_txn_set_abortable_error(
898                 rkts->rkts_rk,
899                 RD_KAFKA_RESP_ERR__TIMED_OUT,
900                 "Transactional operation timed out");
901 
902         rd_kafka_txn_curr_api_reply_error(
903                 rkq,
904                 rd_kafka_error_new_txn_requires_abort(
905                         RD_KAFKA_RESP_ERR__TIMED_OUT,
906                         "Transactional operation timed out"));
907 }
908 
909 /**
910  * @brief Op timeout callback which does not fail the current transaction,
911  *        and sets the retriable flag on the error.
912  *
913  * @locality rdkafka main thread
914  * @locks none
915  */
916 static void
rd_kafka_txn_curr_api_retriable_timeout_cb(rd_kafka_timers_t * rkts,void * arg)917 rd_kafka_txn_curr_api_retriable_timeout_cb (rd_kafka_timers_t *rkts, void *arg) {
918         rd_kafka_q_t *rkq = arg;
919 
920         rd_kafka_txn_curr_api_reply_error(
921                 rkq,
922                 rd_kafka_error_new_retriable(
923                         RD_KAFKA_RESP_ERR__TIMED_OUT,
924                         "Transactional operation timed out"));
925 }
926 
927 
928 /**
929  * @brief Op timeout callback which does not fail the current transaction.
930  *
931  * @locality rdkafka main thread
932  * @locks none
933  */
934 static void
rd_kafka_txn_curr_api_timeout_cb(rd_kafka_timers_t * rkts,void * arg)935 rd_kafka_txn_curr_api_timeout_cb (rd_kafka_timers_t *rkts, void *arg) {
936         rd_kafka_q_t *rkq = arg;
937 
938         rd_kafka_txn_curr_api_reply(rkq, RD_KAFKA_RESP_ERR__TIMED_OUT,
939                                     "Transactional operation timed out");
940 }
941 
942 /**
943  * @brief Op timeout callback for init_transactions() that uses the
944  *        the last txn_init_err as error code.
945  *
946  * @locality rdkafka main thread
947  * @locks none
948  */
949 static void
rd_kafka_txn_curr_api_init_timeout_cb(rd_kafka_timers_t * rkts,void * arg)950 rd_kafka_txn_curr_api_init_timeout_cb (rd_kafka_timers_t *rkts, void *arg) {
951         rd_kafka_q_t *rkq = arg;
952         rd_kafka_error_t *error;
953         rd_kafka_resp_err_t err = rkts->rkts_rk->rk_eos.txn_init_err;
954 
955         if (!err)
956                 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
957 
958         error = rd_kafka_error_new(err,
959                                    "Failed to initialize Producer ID: %s",
960                                    rd_kafka_err2str(err));
961 
962         /* init_transactions() timeouts are retriable */
963         if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
964                 rd_kafka_error_set_retriable(error);
965 
966         rd_kafka_txn_curr_api_reply_error(rkq, error);
967 }
968 
969 
970 
971 /**
972  * @brief Reset the current API, typically because it was completed
973  *        without timeout.
974  *
975  * @locality rdkafka main thread
976  * @locks rd_kafka_wrlock(rk) MUST be held
977  */
rd_kafka_txn_curr_api_reset(rd_kafka_t * rk)978 static void rd_kafka_txn_curr_api_reset (rd_kafka_t *rk) {
979         rd_bool_t timer_was_stopped;
980         rd_kafka_q_t *rkq;
981 
982         rkq = rk->rk_eos.txn_curr_api.tmr.rtmr_arg;
983         timer_was_stopped = rd_kafka_timer_stop(
984                 &rk->rk_timers,
985                 &rk->rk_eos.txn_curr_api.tmr,
986                 RD_DO_LOCK);
987 
988         if (rkq && timer_was_stopped) {
989                 /* Remove the stopped timer's reply queue reference
990                  * since the timer callback will not have fired if
991                  * we stopped the timer. */
992                 rd_kafka_q_destroy(rkq);
993         }
994 
995         *rk->rk_eos.txn_curr_api.name = '\0';
996         rk->rk_eos.txn_curr_api.flags = 0;
997 }
998 
999 
1000 /**
1001  * @brief Sets the current API op (representing a blocking application API call)
1002  *        and a timeout for the same, and sends the op to the transaction
1003  *        manager thread (rdkafka main thread) for processing.
1004  *
1005  * If the timeout expires the rko will fail with ERR__TIMED_OUT
1006  * and the txnmgr state will be adjusted according to \p abort_on_timeout:
1007  * if true, the txn will transition to ABORTABLE_ERROR, else remain in
1008  * the current state.
1009  *
1010  * This call will block until a response is received from the rdkafka
1011  * main thread.
1012  *
1013  * Use rd_kafka_txn_curr_api_reset() when operation finishes prior
1014  * to the timeout.
1015  *
1016  * @param rko Op to send to txnmgr, or NULL if no op to send (yet).
1017  * @param flags See RD_KAFKA_TXN_CURR_API_F_.. flags in rdkafka_int.h.
1018  *
1019  * @returns an error, or NULL on success.
1020  *
1021  * @locality application thread
1022  * @locks none
1023  */
1024 static rd_kafka_error_t *
rd_kafka_txn_curr_api_req(rd_kafka_t * rk,const char * name,rd_kafka_op_t * rko,int timeout_ms,int flags)1025 rd_kafka_txn_curr_api_req (rd_kafka_t *rk, const char *name,
1026                            rd_kafka_op_t *rko,
1027                            int timeout_ms, int flags) {
1028         rd_kafka_op_t *reply;
1029         rd_bool_t reuse = rd_false;
1030         rd_bool_t for_reuse;
1031         rd_kafka_q_t *tmpq = NULL;
1032         rd_kafka_error_t *error = NULL;
1033 
1034         /* Strip __FUNCTION__ name's rd_kafka_ prefix since it will
1035          * not make sense in high-level language bindings. */
1036         if (!strncmp(name, "rd_kafka_", strlen("rd_kafka_")))
1037                 name += strlen("rd_kafka_");
1038 
1039         rd_kafka_dbg(rk, EOS, "TXNAPI", "Transactional API called: %s", name);
1040 
1041         if (flags & RD_KAFKA_TXN_CURR_API_F_REUSE) {
1042                 /* Reuse the current API call state. */
1043                 flags &= ~RD_KAFKA_TXN_CURR_API_F_REUSE;
1044                 reuse = rd_true;
1045         }
1046 
1047         rd_kafka_wrlock(rk);
1048 
1049         /* First set for_reuse to the current flags to match with
1050          * the passed flags. */
1051         for_reuse = !!(rk->rk_eos.txn_curr_api.flags &
1052                        RD_KAFKA_TXN_CURR_API_F_FOR_REUSE);
1053 
1054         if ((for_reuse && !reuse) ||
1055             (!for_reuse && *rk->rk_eos.txn_curr_api.name)) {
1056                 error = rd_kafka_error_new(
1057                         RD_KAFKA_RESP_ERR__STATE,
1058                         "Conflicting %s call already in progress",
1059                         rk->rk_eos.txn_curr_api.name);
1060                 rd_kafka_wrunlock(rk);
1061                 if (rko)
1062                         rd_kafka_op_destroy(rko);
1063                 return error;
1064         }
1065 
1066         rd_assert(for_reuse == reuse);
1067 
1068         rd_snprintf(rk->rk_eos.txn_curr_api.name,
1069                     sizeof(rk->rk_eos.txn_curr_api.name),
1070                     "%s", name);
1071 
1072         if (rko)
1073                 tmpq = rd_kafka_q_new(rk);
1074 
1075         rk->rk_eos.txn_curr_api.flags |= flags;
1076 
1077         /* Then update for_reuse to the passed flags so that
1078          * api_reset() will not reset curr APIs that are to be reused,
1079          * but a sub-sequent _F_REUSE call will reset it. */
1080         for_reuse = !!(flags & RD_KAFKA_TXN_CURR_API_F_FOR_REUSE);
1081 
1082         /* If no timeout has been specified, use the transaction.timeout.ms */
1083         if (timeout_ms < 0)
1084                 timeout_ms = rk->rk_conf.eos.transaction_timeout_ms;
1085 
1086         if (!reuse && timeout_ms >= 0) {
1087                 rd_kafka_q_keep(tmpq);
1088                 rd_kafka_timer_start_oneshot(
1089                         &rk->rk_timers,
1090                         &rk->rk_eos.txn_curr_api.tmr,
1091                         rd_false,
1092                         timeout_ms * 1000,
1093                         !strcmp(name, "init_transactions") ?
1094                         rd_kafka_txn_curr_api_init_timeout_cb :
1095                         (flags & RD_KAFKA_TXN_CURR_API_F_ABORT_ON_TIMEOUT ?
1096                          rd_kafka_txn_curr_api_abort_timeout_cb :
1097                          (flags & RD_KAFKA_TXN_CURR_API_F_RETRIABLE_ON_TIMEOUT ?
1098                           rd_kafka_txn_curr_api_retriable_timeout_cb :
1099                           rd_kafka_txn_curr_api_timeout_cb)),
1100                         tmpq);
1101         }
1102         rd_kafka_wrunlock(rk);
1103 
1104         if (!rko)
1105                 return NULL;
1106 
1107         /* Send op to rdkafka main thread and wait for reply */
1108         reply = rd_kafka_op_req0(rk->rk_ops, tmpq, rko, RD_POLL_INFINITE);
1109 
1110         rd_kafka_q_destroy_owner(tmpq);
1111 
1112         if ((error = reply->rko_u.txn.error)) {
1113                 reply->rko_u.txn.error = NULL;
1114                 for_reuse = rd_false;
1115         }
1116 
1117         rd_kafka_op_destroy(reply);
1118 
1119         if (!for_reuse)
1120                 rd_kafka_txn_curr_api_reset(rk);
1121 
1122         return error;
1123 }
1124 
1125 
1126 /**
1127  * @brief Async handler for init_transactions()
1128  *
1129  * @locks none
1130  * @locality rdkafka main thread
1131  */
1132 static rd_kafka_op_res_t
rd_kafka_txn_op_init_transactions(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1133 rd_kafka_txn_op_init_transactions (rd_kafka_t *rk,
1134                                    rd_kafka_q_t *rkq,
1135                                    rd_kafka_op_t *rko) {
1136         rd_kafka_error_t *error;
1137 
1138         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1139                 return RD_KAFKA_OP_RES_HANDLED;
1140 
1141         rd_kafka_wrlock(rk);
1142         if ((error = rd_kafka_txn_require_state(
1143                      rk,
1144                      RD_KAFKA_TXN_STATE_INIT,
1145                      RD_KAFKA_TXN_STATE_WAIT_PID,
1146                      RD_KAFKA_TXN_STATE_READY_NOT_ACKED))) {
1147                 rd_kafka_wrunlock(rk);
1148                 goto done;
1149         }
1150 
1151         if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_READY_NOT_ACKED) {
1152                 /* A previous init_transactions() called finished successfully
1153                  * after timeout, the application has called init_transactions()
1154                  * again, we do nothin here, ack_init_transactions() will
1155                  * transition the state from READY_NOT_ACKED to READY. */
1156                 rd_kafka_wrunlock(rk);
1157                 goto done;
1158         }
1159 
1160         /* Possibly a no-op if already in WAIT_PID state */
1161         rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_WAIT_PID);
1162 
1163         /* Destroy previous reply queue for a previously timed out
1164          * init_transactions() call. */
1165         if (rk->rk_eos.txn_init_rkq)
1166                 rd_kafka_q_destroy(rk->rk_eos.txn_init_rkq);
1167 
1168         /* Grab a separate reference to use in state_change(),
1169          * outside the curr_api to allow the curr_api to timeout while
1170          * the background init continues. */
1171         rk->rk_eos.txn_init_rkq = rd_kafka_q_keep(rko->rko_replyq.q);
1172 
1173         rd_kafka_wrunlock(rk);
1174 
1175         rk->rk_eos.txn_init_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1176 
1177         /* Start idempotent producer to acquire PID */
1178         rd_kafka_idemp_start(rk, rd_true/*immediately*/);
1179 
1180         return RD_KAFKA_OP_RES_HANDLED;
1181 
1182  done:
1183         rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
1184                                           error);
1185 
1186         return RD_KAFKA_OP_RES_HANDLED;
1187 }
1188 
1189 
1190 /**
1191  * @brief Async handler for the application to acknowledge
1192  *        successful background completion of init_transactions().
1193  *
1194  * @locks none
1195  * @locality rdkafka main thread
1196  */
1197 static rd_kafka_op_res_t
rd_kafka_txn_op_ack_init_transactions(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1198 rd_kafka_txn_op_ack_init_transactions (rd_kafka_t *rk,
1199                                        rd_kafka_q_t *rkq,
1200                                        rd_kafka_op_t *rko) {
1201         rd_kafka_error_t *error;
1202 
1203         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1204                 return RD_KAFKA_OP_RES_HANDLED;
1205 
1206         rd_kafka_wrlock(rk);
1207         if ((error = rd_kafka_txn_require_state(
1208                      rk,
1209                      RD_KAFKA_TXN_STATE_READY_NOT_ACKED))) {
1210                 rd_kafka_wrunlock(rk);
1211                 goto done;
1212         }
1213 
1214         rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY);
1215 
1216         rd_kafka_wrunlock(rk);
1217         /* FALLTHRU */
1218 
1219  done:
1220         rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
1221                                           error);
1222 
1223         return RD_KAFKA_OP_RES_HANDLED;
1224 }
1225 
1226 
1227 
1228 rd_kafka_error_t *
rd_kafka_init_transactions(rd_kafka_t * rk,int timeout_ms)1229 rd_kafka_init_transactions (rd_kafka_t *rk, int timeout_ms) {
1230         rd_kafka_error_t *error;
1231 
1232         if ((error = rd_kafka_ensure_transactional(rk)))
1233                 return error;
1234 
1235         /* init_transactions() will continue to operate in the background
1236          * if the timeout expires, and the application may call
1237          * init_transactions() again to "continue" with the initialization
1238          * process.
1239          * For this reason we need two states:
1240          *  - TXN_STATE_READY_NOT_ACKED for when initialization is done
1241          *    but the API call timed out prior to success, meaning the
1242          *    application does not know initialization finished and
1243          *    is thus not allowed to call sub-sequent txn APIs, e.g. begin..()
1244          *  - TXN_STATE_READY for when initialization is done and this
1245          *    function has returned successfully to the application.
1246          *
1247          * And due to the two states we need two calls to the rdkafka main
1248          * thread (to keep txn_state synchronization in one place). */
1249 
1250         /* First call is to trigger initialization */
1251         error = rd_kafka_txn_curr_api_req(
1252                 rk, __FUNCTION__,
1253                 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
1254                                    rd_kafka_txn_op_init_transactions),
1255                 timeout_ms,
1256                 RD_KAFKA_TXN_CURR_API_F_RETRIABLE_ON_TIMEOUT|
1257                 RD_KAFKA_TXN_CURR_API_F_FOR_REUSE);
1258         if (error)
1259                 return error;
1260 
1261 
1262         /* Second call is to transition from READY_NOT_ACKED -> READY,
1263          * if necessary. */
1264         return rd_kafka_txn_curr_api_req(
1265                 rk, __FUNCTION__,
1266                 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
1267                                    rd_kafka_txn_op_ack_init_transactions),
1268                 RD_POLL_INFINITE, /* immediate, no timeout needed */
1269                 RD_KAFKA_TXN_CURR_API_F_REUSE);
1270 }
1271 
1272 
1273 
1274 /**
1275  * @brief Handler for begin_transaction()
1276  *
1277  * @locks none
1278  * @locality rdkafka main thread
1279  */
1280 static rd_kafka_op_res_t
rd_kafka_txn_op_begin_transaction(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1281 rd_kafka_txn_op_begin_transaction (rd_kafka_t *rk,
1282                                    rd_kafka_q_t *rkq,
1283                                    rd_kafka_op_t *rko) {
1284         rd_kafka_error_t *error;
1285         rd_bool_t wakeup_brokers = rd_false;
1286 
1287         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1288                 return RD_KAFKA_OP_RES_HANDLED;
1289 
1290         rd_kafka_wrlock(rk);
1291         if (!(error = rd_kafka_txn_require_state(rk,
1292                                                  RD_KAFKA_TXN_STATE_READY))) {
1293                 rd_assert(TAILQ_EMPTY(&rk->rk_eos.txn_rktps));
1294 
1295                 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION);
1296 
1297                 rk->rk_eos.txn_req_cnt = 0;
1298                 rk->rk_eos.txn_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1299                 RD_IF_FREE(rk->rk_eos.txn_errstr, rd_free);
1300                 rk->rk_eos.txn_errstr = NULL;
1301 
1302                 /* Wake up all broker threads (that may have messages to send
1303                  * that were waiting for this transaction state.
1304                  * But needs to be done below with no lock held. */
1305                 wakeup_brokers = rd_true;
1306 
1307         }
1308         rd_kafka_wrunlock(rk);
1309 
1310         if (wakeup_brokers)
1311                 rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT);
1312 
1313         rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
1314                                           error);
1315 
1316         return RD_KAFKA_OP_RES_HANDLED;
1317 }
1318 
1319 
rd_kafka_begin_transaction(rd_kafka_t * rk)1320 rd_kafka_error_t *rd_kafka_begin_transaction (rd_kafka_t *rk) {
1321         rd_kafka_op_t *reply;
1322         rd_kafka_error_t *error;
1323 
1324         if ((error = rd_kafka_ensure_transactional(rk)))
1325                 return error;
1326 
1327         reply = rd_kafka_op_req(
1328                 rk->rk_ops,
1329                 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
1330                                    rd_kafka_txn_op_begin_transaction),
1331                 RD_POLL_INFINITE);
1332 
1333         if ((error = reply->rko_u.txn.error))
1334                 reply->rko_u.txn.error = NULL;
1335 
1336         rd_kafka_op_destroy(reply);
1337 
1338         return error;
1339 }
1340 
1341 
1342 static rd_kafka_resp_err_t
1343 rd_kafka_txn_send_TxnOffsetCommitRequest (rd_kafka_broker_t *rkb,
1344                                           rd_kafka_op_t *rko,
1345                                           rd_kafka_replyq_t replyq,
1346                                           rd_kafka_resp_cb_t *resp_cb,
1347                                           void *reply_opaque);
1348 
1349 /**
1350  * @brief Handle TxnOffsetCommitResponse
1351  *
1352  * @locality rdkafka main thread
1353  * @locks none
1354  */
rd_kafka_txn_handle_TxnOffsetCommit(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1355 static void rd_kafka_txn_handle_TxnOffsetCommit (rd_kafka_t *rk,
1356                                                  rd_kafka_broker_t *rkb,
1357                                                  rd_kafka_resp_err_t err,
1358                                                  rd_kafka_buf_t *rkbuf,
1359                                                  rd_kafka_buf_t *request,
1360                                                  void *opaque) {
1361         const int log_decode_errors = LOG_ERR;
1362         rd_kafka_op_t *rko = opaque;
1363         int actions = 0;
1364         rd_kafka_topic_partition_list_t *partitions = NULL;
1365         char errstr[512];
1366 
1367         *errstr = '\0';
1368 
1369         if (err != RD_KAFKA_RESP_ERR__DESTROY &&
1370             !rd_kafka_q_ready(rko->rko_replyq.q))
1371                 err = RD_KAFKA_RESP_ERR__OUTDATED;
1372 
1373         if (err)
1374                 goto done;
1375 
1376         rd_kafka_buf_read_throttle_time(rkbuf);
1377 
1378         partitions = rd_kafka_buf_read_topic_partitions(rkbuf, 0);
1379         if (!partitions)
1380                 goto err_parse;
1381 
1382         err = rd_kafka_topic_partition_list_get_err(partitions);
1383         if (err) {
1384                 char errparts[256];
1385                 rd_kafka_topic_partition_list_str(partitions,
1386                                                   errparts, sizeof(errparts),
1387                                                   RD_KAFKA_FMT_F_ONLY_ERR);
1388                 rd_snprintf(errstr, sizeof(errstr),
1389                             "Failed to commit offsets to transaction on "
1390                             "broker %s: %s "
1391                             "(after %dms)",
1392                             rd_kafka_broker_name(rkb),
1393                             errparts, (int)(rkbuf->rkbuf_ts_sent/1000));
1394         }
1395 
1396         goto done;
1397 
1398  err_parse:
1399         err = rkbuf->rkbuf_err;
1400 
1401  done:
1402         if (err) {
1403                 rk->rk_eos.txn_req_cnt--;
1404 
1405                 if (!*errstr) {
1406                         rd_snprintf(errstr, sizeof(errstr),
1407                                     "Failed to commit offsets to "
1408                                     "transaction on broker %s: %s "
1409                                     "(after %d ms)",
1410                                     rd_kafka_broker_name(rkb),
1411                                     rd_kafka_err2str(err),
1412                                     (int)(rkbuf->rkbuf_ts_sent/1000));
1413                 }
1414         }
1415 
1416 
1417         if (partitions)
1418                 rd_kafka_topic_partition_list_destroy(partitions);
1419 
1420         switch (err)
1421         {
1422         case RD_KAFKA_RESP_ERR_NO_ERROR:
1423                 break;
1424 
1425         case RD_KAFKA_RESP_ERR__DESTROY:
1426         case RD_KAFKA_RESP_ERR__OUTDATED:
1427                 rd_kafka_op_destroy(rko);
1428                 return;
1429 
1430         case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
1431         case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
1432         case RD_KAFKA_RESP_ERR__TRANSPORT:
1433                 /* Note: this is the group coordinator, not the
1434                  *       transaction coordinator. */
1435                 rd_kafka_coord_cache_evict(&rk->rk_coord_cache, rkb);
1436                 actions |= RD_KAFKA_ERR_ACTION_RETRY;
1437                 break;
1438 
1439         case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS:
1440         case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS:
1441         case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
1442                 actions |= RD_KAFKA_ERR_ACTION_RETRY;
1443                 break;
1444 
1445         case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
1446         case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING:
1447         case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
1448         case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
1449         case RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT:
1450                 actions |= RD_KAFKA_ERR_ACTION_FATAL;
1451                 break;
1452 
1453         case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED:
1454         case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED:
1455                 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1456                 break;
1457 
1458         default:
1459                 /* Unhandled error, fail transaction */
1460                 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1461                 break;
1462         }
1463 
1464         if (actions & RD_KAFKA_ERR_ACTION_FATAL) {
1465                 rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err,
1466                                              "%s", errstr);
1467 
1468         } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
1469                 int remains_ms = rd_timeout_remains(rko->rko_u.txn.abs_timeout);
1470 
1471                 if (!rd_timeout_expired(remains_ms)) {
1472                         rd_kafka_coord_req(
1473                                 rk,
1474                                 RD_KAFKA_COORD_GROUP,
1475                                 rko->rko_u.txn.group_id,
1476                                 rd_kafka_txn_send_TxnOffsetCommitRequest,
1477                                 rko,
1478                                 rd_timeout_remains_limit0(
1479                                         remains_ms,
1480                                         rk->rk_conf.socket_timeout_ms),
1481                                 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1482                                 rd_kafka_txn_handle_TxnOffsetCommit,
1483                                 rko);
1484                         return;
1485                 } else if (!err)
1486                         err = RD_KAFKA_RESP_ERR__TIMED_OUT;
1487                 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1488         }
1489 
1490         if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
1491                 rd_kafka_txn_set_abortable_error(rk, err, "%s", errstr);
1492 
1493         if (err)
1494                 rd_kafka_txn_curr_api_reply(rd_kafka_q_keep(rko->rko_replyq.q),
1495                                             err, "%s", errstr);
1496         else
1497                 rd_kafka_txn_curr_api_reply(rd_kafka_q_keep(rko->rko_replyq.q),
1498                                             RD_KAFKA_RESP_ERR_NO_ERROR, NULL);
1499 
1500         rd_kafka_op_destroy(rko);
1501 }
1502 
1503 
1504 
1505 /**
1506  * @brief Construct and send TxnOffsetCommitRequest.
1507  *
1508  * @locality rdkafka main thread
1509  * @locks none
1510  */
1511 static rd_kafka_resp_err_t
rd_kafka_txn_send_TxnOffsetCommitRequest(rd_kafka_broker_t * rkb,rd_kafka_op_t * rko,rd_kafka_replyq_t replyq,rd_kafka_resp_cb_t * resp_cb,void * reply_opaque)1512 rd_kafka_txn_send_TxnOffsetCommitRequest (rd_kafka_broker_t *rkb,
1513                                           rd_kafka_op_t *rko,
1514                                           rd_kafka_replyq_t replyq,
1515                                           rd_kafka_resp_cb_t *resp_cb,
1516                                           void *reply_opaque) {
1517         rd_kafka_t *rk = rkb->rkb_rk;
1518         rd_kafka_buf_t *rkbuf;
1519         int16_t ApiVersion;
1520         rd_kafka_pid_t pid;
1521         int cnt;
1522 
1523         rd_kafka_rdlock(rk);
1524         if (rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_IN_TRANSACTION) {
1525                 rd_kafka_rdunlock(rk);
1526                 rd_kafka_op_destroy(rko);
1527                 return RD_KAFKA_RESP_ERR__OUTDATED;
1528         }
1529 
1530         pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK);
1531         rd_kafka_rdunlock(rk);
1532         if (!rd_kafka_pid_valid(pid)) {
1533                 rd_kafka_op_destroy(rko);
1534                 return RD_KAFKA_RESP_ERR__STATE;
1535         }
1536 
1537         ApiVersion = rd_kafka_broker_ApiVersion_supported(
1538                 rkb, RD_KAFKAP_TxnOffsetCommit, 0, 0, NULL);
1539         if (ApiVersion == -1) {
1540                 rd_kafka_op_destroy(rko);
1541                 return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
1542         }
1543 
1544         rkbuf = rd_kafka_buf_new_request(rkb,
1545                                          RD_KAFKAP_TxnOffsetCommit, 1,
1546                                          rko->rko_u.txn.offsets->cnt * 50);
1547 
1548         /* transactional_id */
1549         rd_kafka_buf_write_str(rkbuf, rk->rk_conf.eos.transactional_id, -1);
1550 
1551         /* group_id */
1552         rd_kafka_buf_write_str(rkbuf, rko->rko_u.txn.group_id, -1);
1553 
1554         /* PID */
1555         rd_kafka_buf_write_i64(rkbuf, pid.id);
1556         rd_kafka_buf_write_i16(rkbuf, pid.epoch);
1557 
1558         /* Write per-partition offsets list */
1559         cnt = rd_kafka_buf_write_topic_partitions(
1560                 rkbuf,
1561                 rko->rko_u.txn.offsets,
1562                 rd_true /*skip invalid offsets*/,
1563                 rd_false/*dont write Epoch*/,
1564                 rd_true /*write Metadata*/);
1565 
1566         if (!cnt) {
1567                 /* No valid partition offsets, don't commit. */
1568                 rd_kafka_buf_destroy(rkbuf);
1569                 rd_kafka_op_destroy(rko);
1570                 return RD_KAFKA_RESP_ERR__NO_OFFSET;
1571         }
1572 
1573         rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
1574 
1575         rkbuf->rkbuf_max_retries = 3;
1576 
1577         rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
1578                                        replyq, resp_cb, reply_opaque);
1579 
1580         return RD_KAFKA_RESP_ERR_NO_ERROR;
1581 }
1582 
1583 
1584 /**
1585  * @brief Handle AddOffsetsToTxnResponse
1586  *
1587  * @locality rdkafka main thread
1588  * @locks none
1589  */
rd_kafka_txn_handle_AddOffsetsToTxn(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1590 static void rd_kafka_txn_handle_AddOffsetsToTxn (rd_kafka_t *rk,
1591                                                  rd_kafka_broker_t *rkb,
1592                                                  rd_kafka_resp_err_t err,
1593                                                  rd_kafka_buf_t *rkbuf,
1594                                                  rd_kafka_buf_t *request,
1595                                                  void *opaque) {
1596         const int log_decode_errors = LOG_ERR;
1597         rd_kafka_op_t *rko = opaque;
1598         int16_t ErrorCode;
1599         int actions = 0;
1600         int remains_ms;
1601 
1602         if (err == RD_KAFKA_RESP_ERR__DESTROY) {
1603                 rd_kafka_op_destroy(rko);
1604                 return;
1605         }
1606 
1607         if (!rd_kafka_q_ready(rko->rko_replyq.q))
1608                 err = RD_KAFKA_RESP_ERR__OUTDATED;
1609 
1610         if (err)
1611                 goto done;
1612 
1613         rd_kafka_buf_read_throttle_time(rkbuf);
1614         rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1615 
1616         err = ErrorCode;
1617         goto done;
1618 
1619  err_parse:
1620         err = rkbuf->rkbuf_err;
1621 
1622  done:
1623         if (err)
1624                 rk->rk_eos.txn_req_cnt--;
1625 
1626         remains_ms = rd_timeout_remains(rko->rko_u.txn.abs_timeout);
1627 
1628         if (rd_timeout_expired(remains_ms) && !err)
1629                 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
1630 
1631         switch (err)
1632         {
1633         case RD_KAFKA_RESP_ERR_NO_ERROR:
1634                 break;
1635 
1636         case RD_KAFKA_RESP_ERR__OUTDATED:
1637         case RD_KAFKA_RESP_ERR__DESTROY:
1638                 /* Producer is being terminated, ignore the response. */
1639                 break;
1640 
1641         case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
1642         case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
1643         case RD_KAFKA_RESP_ERR__TRANSPORT:
1644         case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
1645                 actions |= RD_KAFKA_ERR_ACTION_RETRY|
1646                         RD_KAFKA_ERR_ACTION_REFRESH;
1647                 break;
1648 
1649         case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
1650         case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
1651         case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
1652         case RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT:
1653                 actions |= RD_KAFKA_ERR_ACTION_FATAL;
1654                 break;
1655 
1656         case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED:
1657         case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED:
1658                 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1659                 break;
1660 
1661         case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
1662         case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS:
1663         case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS:
1664                 actions |= RD_KAFKA_ERR_ACTION_RETRY;
1665                 break;
1666 
1667         default:
1668                 /* All unhandled errors are permanent */
1669                 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1670                 break;
1671         }
1672 
1673 
1674         /* All unhandled errors are considered permanent */
1675         if (err && !actions)
1676                 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1677 
1678         if (actions & RD_KAFKA_ERR_ACTION_FATAL) {
1679                 rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err,
1680                                              "Failed to add offsets to "
1681                                              "transaction: %s",
1682                                              rd_kafka_err2str(err));
1683 
1684         } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
1685                 if (!rd_timeout_expired(remains_ms) &&
1686                     rd_kafka_buf_retry(rk->rk_eos.txn_coord, request))
1687                         return;
1688                 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1689 
1690         } else if (err) {
1691                 rd_rkb_log(rkb, LOG_ERR, "ADDOFFSETS",
1692                            "Failed to add offsets to transaction: %s",
1693                            rd_kafka_err2str(err));
1694         }
1695 
1696         if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
1697                 rd_kafka_txn_set_abortable_error(
1698                         rk, err,
1699                         "Failed to add offsets to "
1700                         "transaction on broker %s: "
1701                         "%s (after %dms)",
1702                         rd_kafka_broker_name(rkb),
1703                         rd_kafka_err2str(err),
1704                         (int)(rkbuf->rkbuf_ts_sent/1000));
1705 
1706         if (!err) {
1707                 /* Step 2: Commit offsets to transaction on the
1708                  * group coordinator. */
1709 
1710                 rd_kafka_coord_req(rk,
1711                                    RD_KAFKA_COORD_GROUP,
1712                                    rko->rko_u.txn.group_id,
1713                                    rd_kafka_txn_send_TxnOffsetCommitRequest,
1714                                    rko,
1715                                    rd_timeout_remains_limit0(
1716                                            remains_ms,
1717                                            rk->rk_conf.socket_timeout_ms),
1718                                    RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1719                                    rd_kafka_txn_handle_TxnOffsetCommit,
1720                                    rko);
1721 
1722         } else {
1723 
1724                 rd_kafka_txn_curr_api_reply(
1725                         rd_kafka_q_keep(rko->rko_replyq.q), err,
1726                         "Failed to add offsets to transaction on broker %s: "
1727                         "%s (after %dms)",
1728                         rd_kafka_broker_name(rkb),
1729                         rd_kafka_err2str(err),
1730                         (int)(rkbuf->rkbuf_ts_sent/1000));
1731 
1732                 rd_kafka_op_destroy(rko);
1733         }
1734 }
1735 
1736 
1737 /**
1738  * @brief Async handler for send_offsets_to_transaction()
1739  *
1740  * @locks none
1741  * @locality rdkafka main thread
1742  */
1743 static rd_kafka_op_res_t
rd_kafka_txn_op_send_offsets_to_transaction(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1744 rd_kafka_txn_op_send_offsets_to_transaction (rd_kafka_t *rk,
1745                                              rd_kafka_q_t *rkq,
1746                                              rd_kafka_op_t *rko) {
1747         rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
1748         char errstr[512];
1749         rd_kafka_error_t *error;
1750         rd_kafka_pid_t pid;
1751 
1752         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1753                 return RD_KAFKA_OP_RES_HANDLED;
1754 
1755         *errstr = '\0';
1756 
1757         rd_kafka_wrlock(rk);
1758 
1759         if ((error = rd_kafka_txn_require_state(
1760                      rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION))) {
1761                 rd_kafka_wrunlock(rk);
1762                 goto err;
1763         }
1764 
1765         rd_kafka_wrunlock(rk);
1766 
1767         pid = rd_kafka_idemp_get_pid0(rk, rd_false/*dont-lock*/);
1768         if (!rd_kafka_pid_valid(pid)) {
1769                 rd_dassert(!*"BUG: No PID despite proper transaction state");
1770                 error = rd_kafka_error_new_retriable(
1771                         RD_KAFKA_RESP_ERR__STATE,
1772                         "No PID available (idempotence state %s)",
1773                         rd_kafka_idemp_state2str(rk->rk_eos.idemp_state));
1774                 goto err;
1775         }
1776 
1777         /* This is a multi-stage operation, consisting of:
1778          *  1) send AddOffsetsToTxnRequest to transaction coordinator.
1779          *  2) send TxnOffsetCommitRequest to group coordinator. */
1780 
1781         err = rd_kafka_AddOffsetsToTxnRequest(
1782                 rk->rk_eos.txn_coord,
1783                 rk->rk_conf.eos.transactional_id,
1784                 pid,
1785                 rko->rko_u.txn.group_id,
1786                 errstr, sizeof(errstr),
1787                 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1788                 rd_kafka_txn_handle_AddOffsetsToTxn,
1789                 rko);
1790 
1791         if (err) {
1792                 error = rd_kafka_error_new_retriable(err, "%s", errstr);
1793                 goto err;
1794         }
1795 
1796         return RD_KAFKA_OP_RES_KEEP; /* the rko is passed to AddOffsetsToTxn */
1797 
1798  err:
1799         rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
1800                                           error);
1801 
1802         return RD_KAFKA_OP_RES_HANDLED;
1803 }
1804 
1805 /**
1806  * error returns:
1807  *   ERR__TRANSPORT - retryable
1808  */
1809 rd_kafka_error_t *
rd_kafka_send_offsets_to_transaction(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * offsets,const rd_kafka_consumer_group_metadata_t * cgmetadata,int timeout_ms)1810 rd_kafka_send_offsets_to_transaction (
1811         rd_kafka_t *rk,
1812         const rd_kafka_topic_partition_list_t *offsets,
1813         const rd_kafka_consumer_group_metadata_t *cgmetadata,
1814         int timeout_ms) {
1815         rd_kafka_error_t *error;
1816         rd_kafka_op_t *rko;
1817         rd_kafka_topic_partition_list_t *valid_offsets;
1818 
1819         if ((error = rd_kafka_ensure_transactional(rk)))
1820                 return error;
1821 
1822         if (!cgmetadata || !offsets)
1823                 return rd_kafka_error_new(
1824                         RD_KAFKA_RESP_ERR__INVALID_ARG,
1825                         "cgmetadata and offsets are required parameters");
1826 
1827         valid_offsets = rd_kafka_topic_partition_list_match(
1828                 offsets, rd_kafka_topic_partition_match_valid_offset, NULL);
1829 
1830         if (valid_offsets->cnt == 0) {
1831                 /* No valid offsets, e.g., nothing was consumed,
1832                  * this is not an error, do nothing. */
1833                 rd_kafka_topic_partition_list_destroy(valid_offsets);
1834                 return NULL;
1835         }
1836 
1837         rd_kafka_topic_partition_list_sort_by_topic(valid_offsets);
1838 
1839         rko = rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
1840                                  rd_kafka_txn_op_send_offsets_to_transaction);
1841         rko->rko_u.txn.offsets = valid_offsets;
1842         rko->rko_u.txn.group_id = rd_strdup(cgmetadata->group_id);
1843         if (timeout_ms > rk->rk_conf.eos.transaction_timeout_ms)
1844                 timeout_ms = rk->rk_conf.eos.transaction_timeout_ms;
1845         rko->rko_u.txn.abs_timeout = rd_timeout_init(timeout_ms);
1846 
1847         return rd_kafka_txn_curr_api_req(
1848                 rk, __FUNCTION__, rko,
1849                 RD_POLL_INFINITE, /* rely on background code to time out */
1850                 0 /* no flags */);
1851 }
1852 
1853 
1854 
1855 
1856 
1857 /**
1858  * @brief Successfully complete the transaction.
1859  *
1860  * @locality rdkafka main thread
1861  * @locks rd_kafka_wrlock(rk) MUST be held
1862  */
rd_kafka_txn_complete(rd_kafka_t * rk)1863 static void rd_kafka_txn_complete (rd_kafka_t *rk) {
1864 
1865         rd_kafka_dbg(rk, EOS, "TXNCOMPLETE",
1866                      "Transaction successfully %s",
1867                      rk->rk_eos.txn_state ==
1868                      RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION ?
1869                      "committed" : "aborted");
1870 
1871         /* Clear all transaction partition state */
1872         rd_kafka_txn_clear_pending_partitions(rk);
1873         rd_kafka_txn_clear_partitions(rk);
1874 
1875         rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY);
1876 }
1877 
1878 
1879 
1880 /**
1881  * @brief Handle EndTxnResponse (commit or abort)
1882  *
1883  * @locality rdkafka main thread
1884  * @locks none
1885  */
rd_kafka_txn_handle_EndTxn(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1886 static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,
1887                                         rd_kafka_broker_t *rkb,
1888                                         rd_kafka_resp_err_t err,
1889                                         rd_kafka_buf_t *rkbuf,
1890                                         rd_kafka_buf_t *request,
1891                                         void *opaque) {
1892         const int log_decode_errors = LOG_ERR;
1893         rd_kafka_q_t *rkq = opaque;
1894         int16_t ErrorCode;
1895         int actions = 0;
1896         rd_bool_t is_commit = rd_false;
1897 
1898         if (err == RD_KAFKA_RESP_ERR__DESTROY) {
1899                 rd_kafka_q_destroy(rkq);
1900                 return;
1901         }
1902 
1903         if (err)
1904                 goto err;
1905 
1906         rd_kafka_buf_read_throttle_time(rkbuf);
1907         rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1908 
1909         err = ErrorCode;
1910         /* FALLTHRU */
1911 
1912  err_parse:
1913         err = rkbuf->rkbuf_err;
1914  err:
1915         rd_kafka_wrlock(rk);
1916         if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION)
1917                 is_commit = rd_true;
1918         else if (rk->rk_eos.txn_state ==
1919                  RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION)
1920                 is_commit = rd_false;
1921         else
1922                 err = RD_KAFKA_RESP_ERR__OUTDATED;
1923         rd_kafka_wrunlock(rk);
1924 
1925         switch (err)
1926         {
1927         case RD_KAFKA_RESP_ERR_NO_ERROR:
1928                 /* EndTxn successful: complete the transaction */
1929                 rd_kafka_txn_complete(rk);
1930                 break;
1931 
1932         case RD_KAFKA_RESP_ERR__OUTDATED:
1933         case RD_KAFKA_RESP_ERR__DESTROY:
1934                 /* Producer is being terminated, ignore the response. */
1935                 break;
1936 
1937         case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
1938         case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
1939         case RD_KAFKA_RESP_ERR__TRANSPORT:
1940                 rd_kafka_wrlock(rk);
1941                 rd_kafka_txn_coord_set(rk, NULL,
1942                                        "EndTxn failed: %s",
1943                                        rd_kafka_err2str(err));
1944                 rd_kafka_wrunlock(rk);
1945                 actions |= RD_KAFKA_ERR_ACTION_RETRY;
1946                 break;
1947 
1948         case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
1949         case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
1950         case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
1951                 actions |= RD_KAFKA_ERR_ACTION_FATAL;
1952                 break;
1953 
1954         default:
1955                 /* All unhandled errors are permanent */
1956                 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1957         }
1958 
1959 
1960         if (actions & RD_KAFKA_ERR_ACTION_FATAL) {
1961                 rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err,
1962                                              "Failed to end transaction: %s",
1963                                              rd_kafka_err2str(err));
1964 
1965         } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
1966                 if (rd_kafka_buf_retry(rkb, request))
1967                         return;
1968                 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1969         }
1970 
1971         if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
1972                 rd_kafka_txn_set_abortable_error(rk, err,
1973                                                  "Failed to end transaction: "
1974                                                  "%s",
1975                                                  rd_kafka_err2str(err));
1976 
1977         if (err)
1978                 rd_kafka_txn_curr_api_reply(
1979                         rkq, err,
1980                         "EndTxn %s failed: %s", is_commit ? "commit" : "abort",
1981                         rd_kafka_err2str(err));
1982         else
1983                 rd_kafka_txn_curr_api_reply(rkq, RD_KAFKA_RESP_ERR_NO_ERROR,
1984                                             NULL);
1985 }
1986 
1987 
1988 
1989 /**
1990  * @brief Handler for commit_transaction()
1991  *
1992  * @locks none
1993  * @locality rdkafka main thread
1994  */
1995 static rd_kafka_op_res_t
rd_kafka_txn_op_commit_transaction(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1996 rd_kafka_txn_op_commit_transaction (rd_kafka_t *rk,
1997                                     rd_kafka_q_t *rkq,
1998                                     rd_kafka_op_t *rko) {
1999         rd_kafka_error_t *error;
2000         rd_kafka_resp_err_t err;
2001         char errstr[512];
2002         rd_kafka_pid_t pid;
2003 
2004         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
2005                 return RD_KAFKA_OP_RES_HANDLED;
2006 
2007         rd_kafka_wrlock(rk);
2008 
2009         if ((error = rd_kafka_txn_require_state(
2010                      rk, RD_KAFKA_TXN_STATE_BEGIN_COMMIT)))
2011                 goto err;
2012 
2013         pid = rd_kafka_idemp_get_pid0(rk, rd_false/*dont-lock*/);
2014         if (!rd_kafka_pid_valid(pid)) {
2015                 rd_dassert(!*"BUG: No PID despite proper transaction state");
2016                 error = rd_kafka_error_new_retriable(
2017                         RD_KAFKA_RESP_ERR__STATE,
2018                         "No PID available (idempotence state %s)",
2019                         rd_kafka_idemp_state2str(rk->rk_eos.idemp_state));
2020                 goto err;
2021         }
2022 
2023         err = rd_kafka_EndTxnRequest(rk->rk_eos.txn_coord,
2024                                      rk->rk_conf.eos.transactional_id,
2025                                      pid,
2026                                      rd_true /* commit */,
2027                                      errstr, sizeof(errstr),
2028                                      RD_KAFKA_REPLYQ(rk->rk_ops, 0),
2029                                      rd_kafka_txn_handle_EndTxn,
2030                                      rd_kafka_q_keep(rko->rko_replyq.q));
2031         if (err) {
2032                 error = rd_kafka_error_new_retriable(err, "%s", errstr);
2033                 goto err;
2034         }
2035 
2036         rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION);
2037 
2038         rd_kafka_wrunlock(rk);
2039 
2040         return RD_KAFKA_OP_RES_HANDLED;
2041 
2042  err:
2043         rd_kafka_wrunlock(rk);
2044 
2045         rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
2046                                           error);
2047 
2048         return RD_KAFKA_OP_RES_HANDLED;
2049 }
2050 
2051 
2052 /**
2053  * @brief Handler for commit_transaction()'s first phase: begin commit
2054  *
2055  * @locks none
2056  * @locality rdkafka main thread
2057  */
2058 static rd_kafka_op_res_t
rd_kafka_txn_op_begin_commit(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)2059 rd_kafka_txn_op_begin_commit (rd_kafka_t *rk,
2060                               rd_kafka_q_t *rkq,
2061                               rd_kafka_op_t *rko) {
2062         rd_kafka_error_t *error;
2063 
2064         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
2065                 return RD_KAFKA_OP_RES_HANDLED;
2066 
2067 
2068         if ((error = rd_kafka_txn_require_state(
2069                      rk,
2070                      RD_KAFKA_TXN_STATE_IN_TRANSACTION,
2071                      RD_KAFKA_TXN_STATE_BEGIN_COMMIT)))
2072                 goto done;
2073 
2074         rd_kafka_wrlock(rk);
2075         rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_BEGIN_COMMIT);
2076         rd_kafka_wrunlock(rk);
2077 
2078         /* FALLTHRU */
2079  done:
2080         rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
2081                                           error);
2082 
2083         return RD_KAFKA_OP_RES_HANDLED;
2084 }
2085 
2086 
2087 rd_kafka_error_t *
rd_kafka_commit_transaction(rd_kafka_t * rk,int timeout_ms)2088 rd_kafka_commit_transaction (rd_kafka_t *rk, int timeout_ms) {
2089         rd_kafka_error_t *error;
2090         rd_kafka_resp_err_t err;
2091         rd_ts_t abs_timeout;
2092 
2093         if ((error = rd_kafka_ensure_transactional(rk)))
2094                 return error;
2095 
2096         /* The commit is in two phases:
2097          *   - begin commit: wait for outstanding messages to be produced,
2098          *                   disallow new messages from being produced
2099          *                   by application.
2100          *   - commit: commit transaction.
2101          */
2102 
2103         abs_timeout = rd_timeout_init(timeout_ms);
2104 
2105         /* Begin commit */
2106         error = rd_kafka_txn_curr_api_req(
2107                 rk, "commit_transaction (begin)",
2108                 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
2109                                    rd_kafka_txn_op_begin_commit),
2110                 rd_timeout_remains(abs_timeout),
2111                 RD_KAFKA_TXN_CURR_API_F_FOR_REUSE|
2112                 RD_KAFKA_TXN_CURR_API_F_ABORT_ON_TIMEOUT);
2113         if (error)
2114                 return error;
2115 
2116         rd_kafka_dbg(rk, EOS, "TXNCOMMIT",
2117                      "Flushing %d outstanding message(s) prior to commit",
2118                      rd_kafka_outq_len(rk));
2119 
2120         /* Wait for queued messages to be delivered, limited by
2121          * the remaining transaction lifetime. */
2122         if ((err = rd_kafka_flush(rk, rd_timeout_remains(abs_timeout)))) {
2123                 if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
2124                         error = rd_kafka_error_new_retriable(
2125                                 err,
2126                                 "Failed to flush all outstanding messages "
2127                                 "within the transaction timeout: "
2128                                 "%d message(s) remaining%s",
2129                                 rd_kafka_outq_len(rk),
2130                                 (rk->rk_conf.enabled_events &
2131                                  RD_KAFKA_EVENT_DR) ?
2132                                 ": the event queue must be polled "
2133                                 "for delivery report events in a separate "
2134                                 "thread or prior to calling commit" : "");
2135                 else
2136                         error = rd_kafka_error_new_retriable(
2137                                 err,
2138                                 "Failed to flush outstanding messages: %s",
2139                                 rd_kafka_err2str(err));
2140 
2141                 rd_kafka_txn_curr_api_reset(rk);
2142 
2143                 /* FIXME: What to do here? Add test case */
2144 
2145                 return error;
2146         }
2147 
2148 
2149         /* Commit transaction */
2150         return rd_kafka_txn_curr_api_req(
2151                 rk, "commit_transaction",
2152                 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
2153                                    rd_kafka_txn_op_commit_transaction),
2154                 rd_timeout_remains(abs_timeout),
2155                 RD_KAFKA_TXN_CURR_API_F_REUSE|
2156                 RD_KAFKA_TXN_CURR_API_F_ABORT_ON_TIMEOUT);
2157 }
2158 
2159 
2160 
2161 /**
2162  * @brief Handler for abort_transaction()'s first phase: begin abort
2163  *
2164  * @locks none
2165  * @locality rdkafka main thread
2166  */
2167 static rd_kafka_op_res_t
rd_kafka_txn_op_begin_abort(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)2168 rd_kafka_txn_op_begin_abort (rd_kafka_t *rk,
2169                               rd_kafka_q_t *rkq,
2170                               rd_kafka_op_t *rko) {
2171         rd_kafka_error_t *error;
2172 
2173         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
2174                 return RD_KAFKA_OP_RES_HANDLED;
2175 
2176         if ((error = rd_kafka_txn_require_state(
2177                      rk,
2178                      RD_KAFKA_TXN_STATE_IN_TRANSACTION,
2179                      RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION,
2180                      RD_KAFKA_TXN_STATE_ABORTABLE_ERROR)))
2181                 goto done;
2182 
2183         rd_kafka_wrlock(rk);
2184         rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION);
2185         rd_kafka_wrunlock(rk);
2186 
2187         mtx_lock(&rk->rk_eos.txn_pending_lock);
2188         rd_kafka_txn_clear_pending_partitions(rk);
2189         mtx_unlock(&rk->rk_eos.txn_pending_lock);
2190 
2191 
2192         /* FALLTHRU */
2193  done:
2194         rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
2195                                           error);
2196 
2197         return RD_KAFKA_OP_RES_HANDLED;
2198 }
2199 
2200 
2201 /**
2202  * @brief Handler for abort_transaction()
2203  *
2204  * @locks none
2205  * @locality rdkafka main thread
2206  */
2207 static rd_kafka_op_res_t
rd_kafka_txn_op_abort_transaction(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)2208 rd_kafka_txn_op_abort_transaction (rd_kafka_t *rk,
2209                                    rd_kafka_q_t *rkq,
2210                                    rd_kafka_op_t *rko) {
2211         rd_kafka_error_t *error;
2212         rd_kafka_resp_err_t err;
2213         char errstr[512];
2214         rd_kafka_pid_t pid;
2215 
2216         if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
2217                 return RD_KAFKA_OP_RES_HANDLED;
2218 
2219         rd_kafka_wrlock(rk);
2220 
2221         if ((error = rd_kafka_txn_require_state(
2222                      rk, RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION)))
2223                 goto err;
2224 
2225         pid = rd_kafka_idemp_get_pid0(rk, rd_false/*dont-lock*/);
2226         if (!rd_kafka_pid_valid(pid)) {
2227                 rd_dassert(!*"BUG: No PID despite proper transaction state");
2228                 error = rd_kafka_error_new_retriable(
2229                         RD_KAFKA_RESP_ERR__STATE,
2230                         "No PID available (idempotence state %s)",
2231                         rd_kafka_idemp_state2str(rk->rk_eos.idemp_state));
2232                 goto err;
2233         }
2234 
2235         if (!rk->rk_eos.txn_req_cnt) {
2236                 rd_kafka_dbg(rk, EOS, "TXNABORT",
2237                              "No partitions registered: not sending EndTxn");
2238                 rd_kafka_txn_complete(rk);
2239                 goto err;
2240         }
2241 
2242         err = rd_kafka_EndTxnRequest(rk->rk_eos.txn_coord,
2243                                      rk->rk_conf.eos.transactional_id,
2244                                      pid,
2245                                      rd_false /* abort */,
2246                                      errstr, sizeof(errstr),
2247                                      RD_KAFKA_REPLYQ(rk->rk_ops, 0),
2248                                      rd_kafka_txn_handle_EndTxn,
2249                                      rd_kafka_q_keep(rko->rko_replyq.q));
2250         if (err) {
2251                 error = rd_kafka_error_new_retriable(err, "%s", errstr);
2252                 goto err;
2253         }
2254 
2255         rd_kafka_wrunlock(rk);
2256 
2257         return RD_KAFKA_OP_RES_HANDLED;
2258 
2259  err:
2260         rd_kafka_wrunlock(rk);
2261 
2262         rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
2263                                           error);
2264 
2265         // FIXME: What state do we transition to? READY? FATAL?
2266 
2267         return RD_KAFKA_OP_RES_HANDLED;
2268 }
2269 
2270 
2271 rd_kafka_error_t *
rd_kafka_abort_transaction(rd_kafka_t * rk,int timeout_ms)2272 rd_kafka_abort_transaction (rd_kafka_t *rk, int timeout_ms) {
2273         rd_kafka_error_t *error;
2274         rd_kafka_resp_err_t err;
2275         rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
2276 
2277         if ((error = rd_kafka_ensure_transactional(rk)))
2278                 return error;
2279 
2280         /* The abort is multi-phase:
2281          * - set state to ABORTING_TRANSACTION
2282          * - flush() outstanding messages
2283          * - send EndTxn
2284          *
2285          * The curr_api must be reused during all these steps to avoid
2286          * a race condition where another application thread calls a
2287          * txn API inbetween the steps.
2288          */
2289 
2290         error = rd_kafka_txn_curr_api_req(
2291                 rk, "abort_transaction (begin)",
2292                 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
2293                                    rd_kafka_txn_op_begin_abort),
2294                 RD_POLL_INFINITE, /* begin_abort is immediate, no timeout */
2295                 RD_KAFKA_TXN_CURR_API_F_FOR_REUSE|
2296                 RD_KAFKA_TXN_CURR_API_F_RETRIABLE_ON_TIMEOUT);
2297         if (error)
2298                 return error;
2299 
2300         rd_kafka_dbg(rk, EOS, "TXNABORT",
2301                      "Purging and flushing %d outstanding message(s) prior "
2302                      "to abort",
2303                      rd_kafka_outq_len(rk));
2304 
2305         /* Purge all queued messages.
2306          * Will need to wait for messages in-flight since purging these
2307          * messages may lead to gaps in the idempotent producer sequences. */
2308         err = rd_kafka_purge(rk,
2309                              RD_KAFKA_PURGE_F_QUEUE|
2310                              RD_KAFKA_PURGE_F_ABORT_TXN);
2311 
2312         /* Serve delivery reports for the purged messages. */
2313         if ((err = rd_kafka_flush(rk, rd_timeout_remains(abs_timeout)))) {
2314                 /* FIXME: Not sure these errors matter that much */
2315                 if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
2316                         error = rd_kafka_error_new_retriable(
2317                                 err,
2318                                 "Failed to flush all outstanding messages "
2319                                 "within the transaction timeout: "
2320                                 "%d message(s) remaining%s",
2321                                 rd_kafka_outq_len(rk),
2322                                 (rk->rk_conf.enabled_events &
2323                                  RD_KAFKA_EVENT_DR) ?
2324                                 ": the event queue must be polled "
2325                                 "for delivery report events in a separate "
2326                                 "thread or prior to calling abort" : "");
2327 
2328                 else
2329                         error = rd_kafka_error_new_retriable(
2330                                 err,
2331                                 "Failed to flush outstanding messages: %s",
2332                                 rd_kafka_err2str(err));
2333 
2334                 rd_kafka_txn_curr_api_reset(rk);
2335 
2336                 /* FIXME: What to do here? */
2337 
2338                 return error;
2339         }
2340 
2341 
2342         return rd_kafka_txn_curr_api_req(
2343                 rk, "abort_transaction",
2344                 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
2345                                    rd_kafka_txn_op_abort_transaction),
2346                 0,
2347                 RD_KAFKA_TXN_CURR_API_F_REUSE);
2348 }
2349 
2350 
2351 
2352 /**
2353  * @brief Coordinator query timer
2354  *
2355  * @locality rdkafka main thread
2356  * @locks none
2357  */
2358 
rd_kafka_txn_coord_timer_cb(rd_kafka_timers_t * rkts,void * arg)2359 static void rd_kafka_txn_coord_timer_cb (rd_kafka_timers_t *rkts, void *arg) {
2360         rd_kafka_t *rk = arg;
2361 
2362         rd_kafka_wrlock(rk);
2363         rd_kafka_txn_coord_query(rk, "Coordinator query timer");
2364         rd_kafka_wrunlock(rk);
2365 }
2366 
2367 /**
2368  * @brief (Re-)Start coord query timer
2369  *
2370  * @locality rdkafka main thread
2371  * @locks none
2372  */
rd_kafka_txn_coord_timer_restart(rd_kafka_t * rk,int timeout_ms)2373 static void rd_kafka_txn_coord_timer_restart (rd_kafka_t *rk, int timeout_ms) {
2374         rd_assert(rd_kafka_is_transactional(rk));
2375         rd_kafka_timer_start_oneshot(&rk->rk_timers,
2376                                      &rk->rk_eos.txn_coord_tmr, rd_true,
2377                                      1000 * timeout_ms,
2378                                      rd_kafka_txn_coord_timer_cb, rk);
2379 }
2380 
2381 
2382 /**
2383  * @brief Parses and handles a FindCoordinator response.
2384  *
2385  * @locality rdkafka main thread
2386  * @locks none
2387  */
2388 static void
rd_kafka_txn_handle_FindCoordinator(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)2389 rd_kafka_txn_handle_FindCoordinator (rd_kafka_t *rk,
2390                                      rd_kafka_broker_t *rkb,
2391                                      rd_kafka_resp_err_t err,
2392                                      rd_kafka_buf_t *rkbuf,
2393                                      rd_kafka_buf_t *request,
2394                                      void *opaque) {
2395         const int log_decode_errors = LOG_ERR;
2396         int16_t ErrorCode;
2397         rd_kafkap_str_t Host;
2398         int32_t NodeId, Port;
2399         char errstr[512];
2400 
2401         *errstr = '\0';
2402 
2403         rk->rk_eos.txn_wait_coord = rd_false;
2404 
2405         if (err)
2406                 goto err;
2407 
2408         if (request->rkbuf_reqhdr.ApiVersion >= 1)
2409                 rd_kafka_buf_read_throttle_time(rkbuf);
2410 
2411         rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
2412 
2413         if (request->rkbuf_reqhdr.ApiVersion >= 1) {
2414                 rd_kafkap_str_t ErrorMsg;
2415                 rd_kafka_buf_read_str(rkbuf, &ErrorMsg);
2416                 if (ErrorCode)
2417                         rd_snprintf(errstr, sizeof(errstr),
2418                                     "%.*s", RD_KAFKAP_STR_PR(&ErrorMsg));
2419         }
2420 
2421         if ((err = ErrorCode))
2422                 goto err;
2423 
2424         rd_kafka_buf_read_i32(rkbuf, &NodeId);
2425         rd_kafka_buf_read_str(rkbuf, &Host);
2426         rd_kafka_buf_read_i32(rkbuf, &Port);
2427 
2428         rd_rkb_dbg(rkb, EOS, "TXNCOORD",
2429                    "FindCoordinator response: "
2430                    "Transaction coordinator is broker %"PRId32" (%.*s:%d)",
2431                    NodeId, RD_KAFKAP_STR_PR(&Host), (int)Port);
2432 
2433         rd_kafka_rdlock(rk);
2434         if (NodeId == -1)
2435                 err = RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE;
2436         else if (!(rkb = rd_kafka_broker_find_by_nodeid(rk, NodeId))) {
2437                 rd_snprintf(errstr, sizeof(errstr),
2438                             "Transaction coordinator %"PRId32" is unknown",
2439                             NodeId);
2440                 err = RD_KAFKA_RESP_ERR__UNKNOWN_BROKER;
2441         }
2442         rd_kafka_rdunlock(rk);
2443 
2444         if (err)
2445                 goto err;
2446 
2447         rd_kafka_wrlock(rk);
2448         rd_kafka_txn_coord_set(rk, rkb, "FindCoordinator response");
2449         rd_kafka_wrunlock(rk);
2450 
2451         rd_kafka_broker_destroy(rkb);
2452 
2453         return;
2454 
2455  err_parse:
2456         err = rkbuf->rkbuf_err;
2457  err:
2458 
2459         switch (err)
2460         {
2461         case RD_KAFKA_RESP_ERR__DESTROY:
2462                 return;
2463 
2464         case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
2465                 rd_kafka_wrlock(rk);
2466                 rd_kafka_txn_set_fatal_error(
2467                         rkb->rkb_rk, RD_DONT_LOCK, err,
2468                         "Failed to find transaction coordinator: %s: %s%s%s",
2469                         rd_kafka_broker_name(rkb),
2470                         rd_kafka_err2str(err),
2471                         *errstr ? ": " : "", errstr);
2472 
2473                 rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_FATAL_ERROR);
2474                 rd_kafka_wrunlock(rk);
2475                 return;
2476 
2477         case RD_KAFKA_RESP_ERR__UNKNOWN_BROKER:
2478                 rd_kafka_metadata_refresh_brokers(rk, NULL, errstr);
2479                 break;
2480 
2481         default:
2482                 break;
2483         }
2484 
2485         rd_kafka_wrlock(rk);
2486         rd_kafka_txn_coord_set(rk, NULL,
2487                                "Failed to find transaction coordinator: %s: %s",
2488                                rd_kafka_err2name(err),
2489                                *errstr ? errstr : rd_kafka_err2str(err));
2490         rd_kafka_wrunlock(rk);
2491 }
2492 
2493 
2494 
2495 
2496 /**
2497  * @brief Query for the transaction coordinator.
2498  *
2499  * @returns true if a fatal error was raised, else false.
2500  *
2501  * @locality rdkafka main thread
2502  * @locks rd_kafka_wrlock(rk) MUST be held.
2503  */
rd_kafka_txn_coord_query(rd_kafka_t * rk,const char * reason)2504 rd_bool_t rd_kafka_txn_coord_query (rd_kafka_t *rk, const char *reason) {
2505         rd_kafka_resp_err_t err;
2506         char errstr[512];
2507         rd_kafka_broker_t *rkb;
2508 
2509         rd_assert(rd_kafka_is_transactional(rk));
2510 
2511         if (rk->rk_eos.txn_wait_coord) {
2512                 rd_kafka_dbg(rk, EOS, "TXNCOORD",
2513                              "Not sending coordinator query (%s): "
2514                              "waiting for previous query to finish",
2515                              reason);
2516                 return rd_false;
2517         }
2518 
2519         /* Find usable broker to query for the txn coordinator */
2520         rkb = rd_kafka_idemp_broker_any(rk, &err,
2521                                         errstr, sizeof(errstr));
2522         if (!rkb) {
2523                 rd_kafka_dbg(rk, EOS, "TXNCOORD",
2524                              "Unable to query for transaction coordinator: %s",
2525                              errstr);
2526 
2527                 if (rd_kafka_idemp_check_error(rk, err, errstr))
2528                         return rd_true;
2529 
2530                 rd_kafka_txn_coord_timer_restart(rk, 500);
2531 
2532                 return rd_false;
2533         }
2534 
2535         /* Send FindCoordinator request */
2536         err = rd_kafka_FindCoordinatorRequest(
2537                 rkb, RD_KAFKA_COORD_TXN,
2538                 rk->rk_conf.eos.transactional_id,
2539                 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
2540                 rd_kafka_txn_handle_FindCoordinator, NULL);
2541 
2542         if (err) {
2543                 rd_snprintf(errstr, sizeof(errstr),
2544                             "Failed to send coordinator query to %s: "
2545                             "%s",
2546                             rd_kafka_broker_name(rkb),
2547                             rd_kafka_err2str(err));
2548 
2549                 rd_kafka_broker_destroy(rkb);
2550 
2551                 if (rd_kafka_idemp_check_error(rk, err, errstr))
2552                         return rd_true; /* Fatal error */
2553 
2554                 rd_kafka_txn_coord_timer_restart(rk, 500);
2555 
2556                 return rd_false;
2557         }
2558 
2559         rd_kafka_broker_destroy(rkb);
2560 
2561         rk->rk_eos.txn_wait_coord = rd_true;
2562 
2563         return rd_false;
2564 }
2565 
2566 /**
2567  * @brief Sets or clears the current coordinator address.
2568  *
2569  * @returns true if the coordinator was changed, else false.
2570  *
2571  * @locality rd_kafka_main_thread
2572  * @locks rd_kafka_wrlock(rk) MUST be held
2573  */
rd_kafka_txn_coord_set(rd_kafka_t * rk,rd_kafka_broker_t * rkb,const char * fmt,...)2574 rd_bool_t rd_kafka_txn_coord_set (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
2575                                   const char *fmt, ...) {
2576         char buf[256];
2577         va_list ap;
2578 
2579         va_start(ap, fmt);
2580         vsnprintf(buf, sizeof(buf), fmt, ap);
2581         va_end(ap);
2582 
2583 
2584         if (rk->rk_eos.txn_curr_coord == rkb) {
2585                 if (!rkb) {
2586                         rd_kafka_dbg(rk, EOS, "TXNCOORD", "%s", buf);
2587                         /* Keep querying for the coordinator */
2588                         rd_kafka_txn_coord_timer_restart(rk, 500);
2589                 }
2590                 return rd_false;
2591         }
2592 
2593         rd_kafka_dbg(rk, EOS, "TXNCOORD",
2594                      "Transaction coordinator changed from %s -> %s: %s",
2595                      rk->rk_eos.txn_curr_coord ?
2596                      rd_kafka_broker_name(rk->rk_eos.txn_curr_coord) :
2597                      "(none)",
2598                      rkb ? rd_kafka_broker_name(rkb) : "(none)",
2599                      buf);
2600 
2601         if (rk->rk_eos.txn_curr_coord)
2602                 rd_kafka_broker_destroy(rk->rk_eos.txn_curr_coord);
2603 
2604         rk->rk_eos.txn_curr_coord = rkb;
2605         if (rkb)
2606                 rd_kafka_broker_keep(rkb);
2607 
2608         rd_kafka_broker_set_nodename(rk->rk_eos.txn_coord,
2609                                      rk->rk_eos.txn_curr_coord);
2610 
2611         if (!rkb) {
2612                 /* Lost the current coordinator, query for new coordinator */
2613                 rd_kafka_txn_coord_timer_restart(rk, 500);
2614         } else {
2615                 /* Trigger PID state machine */
2616                 rd_kafka_idemp_pid_fsm(rk);
2617         }
2618 
2619         return rd_true;
2620 }
2621 
2622 
2623 /**
2624  * @brief Coordinator state monitor callback.
2625  *
2626  * @locality rdkafka main thread
2627  * @locks none
2628  */
rd_kafka_txn_coord_monitor_cb(rd_kafka_broker_t * rkb)2629 void rd_kafka_txn_coord_monitor_cb (rd_kafka_broker_t *rkb) {
2630         rd_kafka_t *rk = rkb->rkb_rk;
2631         rd_kafka_broker_state_t state = rd_kafka_broker_get_state(rkb);
2632         rd_bool_t is_up;
2633 
2634         rd_assert(rk->rk_eos.txn_coord == rkb);
2635 
2636         is_up = rd_kafka_broker_state_is_up(state);
2637         rd_rkb_dbg(rkb, EOS, "COORD",
2638                    "Transaction coordinator is now %s",
2639                    is_up ? "up" : "down");
2640 
2641         if (!is_up) {
2642                 /* Coordinator is down, the connection will be re-established
2643                  * automatically, but we also trigger a coordinator query
2644                  * to pick up on coordinator change. */
2645                 rd_kafka_txn_coord_timer_restart(rk, 500);
2646 
2647         } else {
2648                 /* Coordinator is up. */
2649 
2650                 rd_kafka_wrlock(rk);
2651                 if (rk->rk_eos.idemp_state < RD_KAFKA_IDEMP_STATE_ASSIGNED) {
2652                         /* See if a idempotence state change is warranted. */
2653                         rd_kafka_idemp_pid_fsm(rk);
2654 
2655                 } else if (rk->rk_eos.idemp_state ==
2656                            RD_KAFKA_IDEMP_STATE_ASSIGNED) {
2657                         /* PID is already valid, continue transactional
2658                          * operations by checking for partitions to register */
2659                         rd_kafka_txn_schedule_register_partitions(rk,
2660                                                                   1/*ASAP*/);
2661                 }
2662 
2663                 rd_kafka_wrunlock(rk);
2664         }
2665 }
2666 
2667 
2668 
2669 /**
2670  * @brief Transactions manager destructor
2671  *
2672  * @locality rdkafka main thread
2673  * @locks none
2674  */
rd_kafka_txns_term(rd_kafka_t * rk)2675 void rd_kafka_txns_term (rd_kafka_t *rk) {
2676         RD_IF_FREE(rk->rk_eos.txn_init_rkq, rd_kafka_q_destroy);
2677 
2678         RD_IF_FREE(rk->rk_eos.txn_errstr, rd_free);
2679 
2680         rd_kafka_timer_stop(&rk->rk_timers,
2681                             &rk->rk_eos.txn_coord_tmr, 1);
2682         rd_kafka_timer_stop(&rk->rk_timers,
2683                             &rk->rk_eos.txn_register_parts_tmr, 1);
2684 
2685         if (rk->rk_eos.txn_curr_coord)
2686                 rd_kafka_broker_destroy(rk->rk_eos.txn_curr_coord);
2687 
2688         /* Logical coordinator */
2689         rd_kafka_broker_persistent_connection_del(
2690                 rk->rk_eos.txn_coord,
2691                 &rk->rk_eos.txn_coord->rkb_persistconn.coord);
2692         rd_kafka_broker_monitor_del(&rk->rk_eos.txn_coord_mon);
2693         rd_kafka_broker_destroy(rk->rk_eos.txn_coord);
2694         rk->rk_eos.txn_coord = NULL;
2695 
2696         mtx_lock(&rk->rk_eos.txn_pending_lock);
2697         rd_kafka_txn_clear_pending_partitions(rk);
2698         mtx_unlock(&rk->rk_eos.txn_pending_lock);
2699         mtx_destroy(&rk->rk_eos.txn_pending_lock);
2700 
2701         rd_kafka_txn_clear_partitions(rk);
2702 }
2703 
2704 
2705 /**
2706  * @brief Initialize transactions manager.
2707  *
2708  * @locality application thread
2709  * @locks none
2710  */
rd_kafka_txns_init(rd_kafka_t * rk)2711 void rd_kafka_txns_init (rd_kafka_t *rk) {
2712         rd_atomic32_init(&rk->rk_eos.txn_may_enq, 0);
2713         mtx_init(&rk->rk_eos.txn_pending_lock, mtx_plain);
2714         TAILQ_INIT(&rk->rk_eos.txn_pending_rktps);
2715         TAILQ_INIT(&rk->rk_eos.txn_waitresp_rktps);
2716         TAILQ_INIT(&rk->rk_eos.txn_rktps);
2717 
2718         /* Logical coordinator */
2719         rk->rk_eos.txn_coord =
2720                 rd_kafka_broker_add_logical(rk, "TxnCoordinator");
2721 
2722         rd_kafka_broker_monitor_add(&rk->rk_eos.txn_coord_mon,
2723                                     rk->rk_eos.txn_coord,
2724                                     rk->rk_ops,
2725                                     rd_kafka_txn_coord_monitor_cb);
2726 
2727         rd_kafka_broker_persistent_connection_add(
2728                 rk->rk_eos.txn_coord,
2729                 &rk->rk_eos.txn_coord->rkb_persistconn.coord);
2730 }
2731 
2732