1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2019 Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 /**
30 * @name Transaction Manager
31 *
32 */
33
34 #include <stdarg.h>
35
36 #include "rd.h"
37 #include "rdkafka_int.h"
38 #include "rdkafka_txnmgr.h"
39 #include "rdkafka_idempotence.h"
40 #include "rdkafka_request.h"
41 #include "rdkafka_error.h"
42 #include "rdunittest.h"
43 #include "rdrand.h"
44
45
46 static void
47 rd_kafka_txn_curr_api_reply_error (rd_kafka_q_t *rkq, rd_kafka_error_t *error);
48
49
50 /**
51 * @brief Ensure client is configured as a transactional producer,
52 * else return error.
53 *
54 * @locality application thread
55 * @locks none
56 */
57 static RD_INLINE rd_kafka_error_t *
rd_kafka_ensure_transactional(const rd_kafka_t * rk)58 rd_kafka_ensure_transactional (const rd_kafka_t *rk) {
59 if (unlikely(rk->rk_type != RD_KAFKA_PRODUCER))
60 return rd_kafka_error_new(
61 RD_KAFKA_RESP_ERR__INVALID_ARG,
62 "The Transactional API can only be used "
63 "on producer instances");
64
65 if (unlikely(!rk->rk_conf.eos.transactional_id))
66 return rd_kafka_error_new(
67 RD_KAFKA_RESP_ERR__NOT_CONFIGURED,
68 "The Transactional API requires "
69 "transactional.id to be configured");
70
71 return NULL;
72 }
73
74
75
76 /**
77 * @brief Ensure transaction state is one of \p states.
78 *
79 * @param the required states, ended by a -1 sentinel.
80 *
81 * @locks rd_kafka_*lock(rk) MUST be held
82 * @locality any
83 */
84 static RD_INLINE rd_kafka_error_t *
rd_kafka_txn_require_states0(rd_kafka_t * rk,rd_kafka_txn_state_t states[])85 rd_kafka_txn_require_states0 (rd_kafka_t *rk,
86 rd_kafka_txn_state_t states[]) {
87 rd_kafka_error_t *error;
88 size_t i;
89
90 if (unlikely((error = rd_kafka_ensure_transactional(rk)) != NULL))
91 return error;
92
93 for (i = 0 ; (int)states[i] != -1 ; i++)
94 if (rk->rk_eos.txn_state == states[i])
95 return NULL;
96
97 error = rd_kafka_error_new(
98 RD_KAFKA_RESP_ERR__STATE,
99 "Operation not valid in state %s",
100 rd_kafka_txn_state2str(rk->rk_eos.txn_state));
101
102
103 if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_FATAL_ERROR)
104 rd_kafka_error_set_fatal(error);
105 else if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR)
106 rd_kafka_error_set_txn_requires_abort(error);
107
108 return error;
109 }
110
111 /** @brief \p ... is a list of states */
112 #define rd_kafka_txn_require_state(rk,...) \
113 rd_kafka_txn_require_states0(rk, \
114 (rd_kafka_txn_state_t[]){ \
115 __VA_ARGS__, -1 })
116
117
118
119 /**
120 * @param ignore Will be set to true if the state transition should be
121 * completely ignored.
122 * @returns true if the state transition is valid, else false.
123 */
124 static rd_bool_t
rd_kafka_txn_state_transition_is_valid(rd_kafka_txn_state_t curr,rd_kafka_txn_state_t new_state,rd_bool_t * ignore)125 rd_kafka_txn_state_transition_is_valid (rd_kafka_txn_state_t curr,
126 rd_kafka_txn_state_t new_state,
127 rd_bool_t *ignore) {
128
129 *ignore = rd_false;
130
131 switch (new_state)
132 {
133 case RD_KAFKA_TXN_STATE_INIT:
134 /* This is the initialized value and this transition will
135 * never happen. */
136 return rd_false;
137
138 case RD_KAFKA_TXN_STATE_WAIT_PID:
139 return curr == RD_KAFKA_TXN_STATE_INIT;
140
141 case RD_KAFKA_TXN_STATE_READY_NOT_ACKED:
142 return curr == RD_KAFKA_TXN_STATE_WAIT_PID;
143
144 case RD_KAFKA_TXN_STATE_READY:
145 return curr == RD_KAFKA_TXN_STATE_READY_NOT_ACKED ||
146 curr == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION ||
147 curr == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION;
148
149 case RD_KAFKA_TXN_STATE_IN_TRANSACTION:
150 return curr == RD_KAFKA_TXN_STATE_READY;
151
152 case RD_KAFKA_TXN_STATE_BEGIN_COMMIT:
153 return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION;
154
155 case RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION:
156 return curr == RD_KAFKA_TXN_STATE_BEGIN_COMMIT;
157
158 case RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION:
159 return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION ||
160 curr == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR;
161
162 case RD_KAFKA_TXN_STATE_ABORTABLE_ERROR:
163 if (curr == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION ||
164 curr == RD_KAFKA_TXN_STATE_FATAL_ERROR) {
165 /* Ignore sub-sequent abortable errors in
166 * these states. */
167 *ignore = rd_true;
168 return 1;
169 }
170
171 return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION ||
172 curr == RD_KAFKA_TXN_STATE_BEGIN_COMMIT ||
173 curr == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION;
174
175 case RD_KAFKA_TXN_STATE_FATAL_ERROR:
176 /* Any state can transition to a fatal error */
177 return rd_true;
178
179 default:
180 RD_NOTREACHED();
181 return rd_false;
182 }
183 }
184
185
186 /**
187 * @brief Transition the transaction state to \p new_state.
188 *
189 * @returns 0 on success or an error code if the state transition
190 * was invalid.
191 *
192 * @locality rdkafka main thread
193 * @locks rd_kafka_wrlock MUST be held
194 */
rd_kafka_txn_set_state(rd_kafka_t * rk,rd_kafka_txn_state_t new_state)195 static void rd_kafka_txn_set_state (rd_kafka_t *rk,
196 rd_kafka_txn_state_t new_state) {
197 rd_bool_t ignore;
198
199 if (rk->rk_eos.txn_state == new_state)
200 return;
201
202 /* Check if state transition is valid */
203 if (!rd_kafka_txn_state_transition_is_valid(rk->rk_eos.txn_state,
204 new_state, &ignore)) {
205 rd_kafka_log(rk, LOG_CRIT, "TXNSTATE",
206 "BUG: Invalid transaction state transition "
207 "attempted: %s -> %s",
208 rd_kafka_txn_state2str(rk->rk_eos.txn_state),
209 rd_kafka_txn_state2str(new_state));
210
211 rd_assert(!*"BUG: Invalid transaction state transition");
212 }
213
214 if (ignore) {
215 /* Ignore this state change */
216 return;
217 }
218
219 rd_kafka_dbg(rk, EOS, "TXNSTATE",
220 "Transaction state change %s -> %s",
221 rd_kafka_txn_state2str(rk->rk_eos.txn_state),
222 rd_kafka_txn_state2str(new_state));
223
224 /* If transitioning from IN_TRANSACTION, the app is no longer
225 * allowed to enqueue (produce) messages. */
226 if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION)
227 rd_atomic32_set(&rk->rk_eos.txn_may_enq, 0);
228 else if (new_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION)
229 rd_atomic32_set(&rk->rk_eos.txn_may_enq, 1);
230
231 rk->rk_eos.txn_state = new_state;
232 }
233
234
235 /**
236 * @brief An unrecoverable transactional error has occurred.
237 *
238 * @param do_lock RD_DO_LOCK: rd_kafka_wrlock(rk) will be acquired and released,
239 * RD_DONT_LOCK: rd_kafka_wrlock(rk) MUST be held by the caller.
240 * @locality any
241 * @locks rd_kafka_wrlock MUST NOT be held
242 */
rd_kafka_txn_set_fatal_error(rd_kafka_t * rk,rd_dolock_t do_lock,rd_kafka_resp_err_t err,const char * fmt,...)243 void rd_kafka_txn_set_fatal_error (rd_kafka_t *rk, rd_dolock_t do_lock,
244 rd_kafka_resp_err_t err,
245 const char *fmt, ...) {
246 char errstr[512];
247 va_list ap;
248
249 va_start(ap, fmt);
250 vsnprintf(errstr, sizeof(errstr), fmt, ap);
251 va_end(ap);
252
253 rd_kafka_log(rk, LOG_ALERT, "TXNERR",
254 "Fatal transaction error: %s (%s)",
255 errstr, rd_kafka_err2name(err));
256
257 if (do_lock)
258 rd_kafka_wrlock(rk);
259 rd_kafka_set_fatal_error0(rk, RD_DONT_LOCK, err, "%s", errstr);
260
261 rk->rk_eos.txn_err = err;
262 if (rk->rk_eos.txn_errstr)
263 rd_free(rk->rk_eos.txn_errstr);
264 rk->rk_eos.txn_errstr = rd_strdup(errstr);
265
266 if (rk->rk_eos.txn_init_rkq) {
267 /* If application has called init_transactions() and
268 * it has now failed, reply to the app. */
269 rd_kafka_txn_curr_api_reply_error(
270 rk->rk_eos.txn_init_rkq,
271 rd_kafka_error_new_fatal(err, "%s", errstr));
272 rk->rk_eos.txn_init_rkq = NULL;
273 }
274
275 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_FATAL_ERROR);
276
277 if (do_lock)
278 rd_kafka_wrunlock(rk);
279 }
280
281
282 /**
283 * @brief An abortable/recoverable transactional error has occured.
284 *
285 * @locality rdkafka main thread
286 * @locks rd_kafka_wrlock MUST NOT be held
287 */
rd_kafka_txn_set_abortable_error(rd_kafka_t * rk,rd_kafka_resp_err_t err,const char * fmt,...)288 void rd_kafka_txn_set_abortable_error (rd_kafka_t *rk,
289 rd_kafka_resp_err_t err,
290 const char *fmt, ...) {
291 char errstr[512];
292 va_list ap;
293
294 if (rd_kafka_fatal_error(rk, NULL, 0)) {
295 rd_kafka_dbg(rk, EOS, "FATAL",
296 "Not propagating abortable transactional "
297 "error (%s) "
298 "since previous fatal error already raised",
299 rd_kafka_err2name(err));
300 return;
301 }
302
303 va_start(ap, fmt);
304 vsnprintf(errstr, sizeof(errstr), fmt, ap);
305 va_end(ap);
306
307 rd_kafka_wrlock(rk);
308 if (rk->rk_eos.txn_err) {
309 rd_kafka_dbg(rk, EOS, "TXNERR",
310 "Ignoring sub-sequent abortable transaction "
311 "error: %s (%s): "
312 "previous error (%s) already raised",
313 errstr,
314 rd_kafka_err2name(err),
315 rd_kafka_err2name(rk->rk_eos.txn_err));
316 rd_kafka_wrunlock(rk);
317 return;
318 }
319
320 rk->rk_eos.txn_err = err;
321 if (rk->rk_eos.txn_errstr)
322 rd_free(rk->rk_eos.txn_errstr);
323 rk->rk_eos.txn_errstr = rd_strdup(errstr);
324
325 rd_kafka_log(rk, LOG_ERR, "TXNERR",
326 "Current transaction failed: %s (%s)",
327 errstr, rd_kafka_err2name(err));
328
329 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORTABLE_ERROR);
330 rd_kafka_wrunlock(rk);
331
332 /* Purge all messages in queue/flight */
333 rd_kafka_purge(rk,
334 RD_KAFKA_PURGE_F_QUEUE |
335 RD_KAFKA_PURGE_F_ABORT_TXN |
336 RD_KAFKA_PURGE_F_NON_BLOCKING);
337
338 }
339
340
341
342 /**
343 * @brief Send op reply to the application which is blocking
344 * on one of the transaction APIs and reset the current API.
345 *
346 * @param rkq is the queue to send the reply on, which may be NULL or disabled.
347 * The \p rkq refcount is decreased by this function.
348 * @param err API error code.
349 * @param errstr_fmt If err is set, a human readable error format string.
350 *
351 * @locality rdkafka main thread
352 * @locks any
353 */
354 static void
rd_kafka_txn_curr_api_reply_error(rd_kafka_q_t * rkq,rd_kafka_error_t * error)355 rd_kafka_txn_curr_api_reply_error (rd_kafka_q_t *rkq, rd_kafka_error_t *error) {
356 rd_kafka_op_t *rko;
357
358 if (!rkq) {
359 if (error)
360 rd_kafka_error_destroy(error);
361 return;
362 }
363
364 rko = rd_kafka_op_new(RD_KAFKA_OP_TXN|RD_KAFKA_OP_REPLY);
365
366 if (error) {
367 rko->rko_u.txn.error = error;
368 rko->rko_err = rd_kafka_error_code(error);
369 }
370
371 rd_kafka_q_enq(rkq, rko);
372
373 rd_kafka_q_destroy(rkq);
374 }
375
376 /**
377 * @brief Wrapper for rd_kafka_txn_curr_api_reply_error() that takes
378 * an error code and format string.
379 *
380 * @param rkq is the queue to send the reply on, which may be NULL or disabled.
381 * The \p rkq refcount is decreased by this function.
382 * @param err API error code.
383 * @param errstr_fmt If err is set, a human readable error format string.
384 *
385 * @locality rdkafka main thread
386 * @locks any
387 */
388 static void
rd_kafka_txn_curr_api_reply(rd_kafka_q_t * rkq,rd_kafka_resp_err_t err,const char * errstr_fmt,...)389 rd_kafka_txn_curr_api_reply (rd_kafka_q_t *rkq,
390 rd_kafka_resp_err_t err,
391 const char *errstr_fmt, ...) {
392 rd_kafka_error_t *error = NULL;
393
394 if (err) {
395 va_list ap;
396 va_start(ap, errstr_fmt);
397 error = rd_kafka_error_new_v(err, errstr_fmt, ap);
398 va_end(ap);
399 }
400
401 rd_kafka_txn_curr_api_reply_error(rkq, error);
402 }
403
404
405
406 /**
407 * @brief The underlying idempotent producer state changed,
408 * see if this affects the transactional operations.
409 *
410 * @locality any thread
411 * @locks rd_kafka_wrlock(rk) MUST be held
412 */
rd_kafka_txn_idemp_state_change(rd_kafka_t * rk,rd_kafka_idemp_state_t idemp_state)413 void rd_kafka_txn_idemp_state_change (rd_kafka_t *rk,
414 rd_kafka_idemp_state_t idemp_state) {
415
416 if (idemp_state == RD_KAFKA_IDEMP_STATE_ASSIGNED &&
417 rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_WAIT_PID) {
418 RD_UT_COVERAGE(1);
419 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY_NOT_ACKED);
420
421 if (rk->rk_eos.txn_init_rkq) {
422 /* Application has called init_transactions() and
423 * it is now complete, reply to the app. */
424 rd_kafka_txn_curr_api_reply(rk->rk_eos.txn_init_rkq,
425 RD_KAFKA_RESP_ERR_NO_ERROR,
426 NULL);
427 rk->rk_eos.txn_init_rkq = NULL;
428 }
429
430 } else if (idemp_state == RD_KAFKA_IDEMP_STATE_FATAL_ERROR &&
431 rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_FATAL_ERROR) {
432 /* A fatal error has been raised. */
433
434 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_FATAL_ERROR);
435
436 if (rk->rk_eos.txn_init_rkq) {
437 /* Application has called init_transactions() and
438 * it has now failed, reply to the app. */
439 rd_kafka_txn_curr_api_reply_error(
440 rk->rk_eos.txn_init_rkq,
441 rd_kafka_error_new_fatal(
442 rk->rk_eos.txn_err ?
443 rk->rk_eos.txn_err :
444 RD_KAFKA_RESP_ERR__FATAL,
445 "Fatal error raised by "
446 "idempotent producer while "
447 "retrieving PID: %s",
448 rk->rk_eos.txn_errstr ?
449 rk->rk_eos.txn_errstr :
450 "see previous logs"));
451 rk->rk_eos.txn_init_rkq = NULL;
452 }
453 }
454 }
455
456
457 /**
458 * @brief Moves a partition from the pending list to the proper list.
459 *
460 * @locality rdkafka main thread
461 * @locks none
462 */
rd_kafka_txn_partition_registered(rd_kafka_toppar_t * rktp)463 static void rd_kafka_txn_partition_registered (rd_kafka_toppar_t *rktp) {
464 rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk;
465
466 rd_kafka_toppar_lock(rktp);
467
468 if (unlikely(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_PEND_TXN))) {
469 rd_kafka_dbg(rk, EOS|RD_KAFKA_DBG_PROTOCOL,
470 "ADDPARTS",
471 "\"%.*s\" [%"PRId32"] is not in pending "
472 "list but returned in AddPartitionsToTxn "
473 "response: ignoring",
474 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
475 rktp->rktp_partition);
476 rd_kafka_toppar_unlock(rktp);
477 return;
478 }
479
480 rd_kafka_dbg(rk, EOS|RD_KAFKA_DBG_TOPIC, "ADDPARTS",
481 "%.*s [%"PRId32"] registered with transaction",
482 RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
483 rktp->rktp_partition);
484
485 rd_assert((rktp->rktp_flags & (RD_KAFKA_TOPPAR_F_PEND_TXN|
486 RD_KAFKA_TOPPAR_F_IN_TXN)) ==
487 RD_KAFKA_TOPPAR_F_PEND_TXN);
488
489 rktp->rktp_flags = (rktp->rktp_flags & ~RD_KAFKA_TOPPAR_F_PEND_TXN) |
490 RD_KAFKA_TOPPAR_F_IN_TXN;
491
492 rd_kafka_toppar_unlock(rktp);
493
494 mtx_lock(&rk->rk_eos.txn_pending_lock);
495 TAILQ_REMOVE(&rk->rk_eos.txn_waitresp_rktps, rktp, rktp_txnlink);
496 mtx_unlock(&rk->rk_eos.txn_pending_lock);
497
498 TAILQ_INSERT_TAIL(&rk->rk_eos.txn_rktps, rktp, rktp_txnlink);
499 }
500
501
502
503 /**
504 * @brief Handle AddPartitionsToTxnResponse
505 *
506 * @locality rdkafka main thread
507 * @locks none
508 */
rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)509 static void rd_kafka_txn_handle_AddPartitionsToTxn (rd_kafka_t *rk,
510 rd_kafka_broker_t *rkb,
511 rd_kafka_resp_err_t err,
512 rd_kafka_buf_t *rkbuf,
513 rd_kafka_buf_t *request,
514 void *opaque) {
515 const int log_decode_errors = LOG_ERR;
516 int32_t TopicCnt;
517 int okcnt = 0, errcnt = 0;
518 int actions = 0;
519 int retry_backoff_ms = 500; /* retry backoff */
520 rd_kafka_resp_err_t reset_coord_err = RD_KAFKA_RESP_ERR_NO_ERROR;
521
522 if (err)
523 goto done;
524
525 rd_kafka_rdlock(rk);
526 rd_assert(rk->rk_eos.txn_state !=
527 RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION);
528
529 if (rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_IN_TRANSACTION &&
530 rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_BEGIN_COMMIT) {
531 /* Response received after aborting transaction */
532 rd_rkb_dbg(rkb, EOS, "ADDPARTS",
533 "Ignoring outdated AddPartitionsToTxn response in "
534 "state %s",
535 rd_kafka_txn_state2str(rk->rk_eos.txn_state));
536 rd_kafka_rdunlock(rk);
537 err = RD_KAFKA_RESP_ERR__OUTDATED;
538 goto done;
539 }
540 rd_kafka_rdunlock(rk);
541
542 rd_kafka_buf_read_throttle_time(rkbuf);
543
544 rd_kafka_buf_read_i32(rkbuf, &TopicCnt);
545
546 while (TopicCnt-- > 0) {
547 rd_kafkap_str_t Topic;
548 rd_kafka_itopic_t *rkt;
549 int32_t PartCnt;
550 int p_actions = 0;
551
552 rd_kafka_buf_read_str(rkbuf, &Topic);
553 rd_kafka_buf_read_i32(rkbuf, &PartCnt);
554
555 rkt = rd_kafka_topic_find0(rk, &Topic);
556 if (rkt)
557 rd_kafka_topic_rdlock(rkt); /* for toppar_get() */
558
559 while (PartCnt-- > 0) {
560 shptr_rd_kafka_toppar_t *s_rktp = NULL;
561 rd_kafka_toppar_t *rktp;
562 int32_t Partition;
563 int16_t ErrorCode;
564
565 rd_kafka_buf_read_i32(rkbuf, &Partition);
566 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
567
568 if (rkt)
569 s_rktp = rd_kafka_toppar_get(rkt,
570 Partition,
571 rd_false);
572
573 if (!s_rktp) {
574 rd_rkb_dbg(rkb, EOS|RD_KAFKA_DBG_PROTOCOL,
575 "ADDPARTS",
576 "Unknown partition \"%.*s\" "
577 "[%"PRId32"] in AddPartitionsToTxn "
578 "response: ignoring",
579 RD_KAFKAP_STR_PR(&Topic),
580 Partition);
581 continue;
582 }
583
584 rktp = rd_kafka_toppar_s2i(s_rktp);
585
586 switch (ErrorCode)
587 {
588 case RD_KAFKA_RESP_ERR_NO_ERROR:
589 /* Move rktp from pending to proper list */
590 rd_kafka_txn_partition_registered(rktp);
591 break;
592
593 case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
594 case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
595 case RD_KAFKA_RESP_ERR__TRANSPORT:
596 reset_coord_err = ErrorCode;
597 p_actions |= RD_KAFKA_ERR_ACTION_RETRY;
598 break;
599
600 case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS:
601 retry_backoff_ms = 20;
602 /* FALLTHRU */
603 case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS:
604 case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
605 p_actions |= RD_KAFKA_ERR_ACTION_RETRY;
606 break;
607
608 case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
609 case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING:
610 case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
611 case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
612 p_actions |= RD_KAFKA_ERR_ACTION_FATAL;
613 err = ErrorCode;
614 break;
615
616 case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED:
617 p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
618 err = ErrorCode;
619 break;
620
621 case RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED:
622 /* Partition skipped due to other partition's
623 * errors */
624 break;
625
626 default:
627 /* Unhandled error, fail transaction */
628 p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
629 break;
630 }
631
632 if (ErrorCode) {
633 errcnt++;
634 actions |= p_actions;
635
636 if (!(p_actions &
637 (RD_KAFKA_ERR_ACTION_FATAL |
638 RD_KAFKA_ERR_ACTION_PERMANENT)))
639 rd_rkb_dbg(
640 rkb, EOS,
641 "ADDPARTS",
642 "AddPartitionsToTxn response: "
643 "partition \"%.*s\": "
644 "[%"PRId32"]: %s",
645 RD_KAFKAP_STR_PR(&Topic),
646 Partition,
647 rd_kafka_err2str(
648 ErrorCode));
649 else
650 rd_rkb_log(rkb, LOG_ERR,
651 "ADDPARTS",
652 "Failed to add partition "
653 "\"%.*s\" [%"PRId32"] to "
654 "transaction: %s",
655 RD_KAFKAP_STR_PR(&Topic),
656 Partition,
657 rd_kafka_err2str(
658 ErrorCode));
659 } else {
660 okcnt++;
661 }
662
663 rd_kafka_toppar_destroy(s_rktp);
664 }
665
666 if (rkt) {
667 rd_kafka_topic_rdunlock(rkt);
668 rd_kafka_topic_destroy0(rkt);
669 }
670 }
671
672 if (actions) /* Actions set from encountered errors '*/
673 goto done;
674
675 /* Since these partitions are now allowed to produce
676 * we wake up all broker threads. */
677 rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT);
678
679 goto done;
680
681 err_parse:
682 err = rkbuf->rkbuf_err;
683
684 done:
685 if (err)
686 rk->rk_eos.txn_req_cnt--;
687
688 if (err == RD_KAFKA_RESP_ERR__DESTROY ||
689 err == RD_KAFKA_RESP_ERR__OUTDATED)
690 return;
691
692 if (reset_coord_err) {
693 rd_kafka_wrlock(rk);
694 rd_kafka_txn_coord_set(rk, NULL,
695 "AddPartitionsToTxn failed: %s",
696 rd_kafka_err2str(reset_coord_err));
697 rd_kafka_wrunlock(rk);
698 }
699
700
701 mtx_lock(&rk->rk_eos.txn_pending_lock);
702 TAILQ_CONCAT(&rk->rk_eos.txn_pending_rktps,
703 &rk->rk_eos.txn_waitresp_rktps,
704 rktp_txnlink);
705 mtx_unlock(&rk->rk_eos.txn_pending_lock);
706
707 if (okcnt + errcnt == 0) {
708 /* Shouldn't happen */
709 rd_kafka_dbg(rk, EOS, "ADDPARTS",
710 "No known partitions in "
711 "AddPartitionsToTxn response");
712 }
713
714 if (actions & RD_KAFKA_ERR_ACTION_FATAL) {
715 rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err,
716 "Failed to add partitions to "
717 "transaction: %s",
718 rd_kafka_err2str(err));
719
720 } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
721 rd_kafka_txn_schedule_register_partitions(rk, retry_backoff_ms);
722
723 } else if (errcnt > 0) {
724 /* Treat all other errors as abortable errors */
725 rd_kafka_txn_set_abortable_error(
726 rk, err,
727 "Failed to add %d/%d partition(s) to transaction "
728 "on broker %s: %s (after %d ms)",
729 errcnt, errcnt + okcnt,
730 rd_kafka_broker_name(rkb),
731 rd_kafka_err2str(err),
732 (int)(rkbuf->rkbuf_ts_sent/1000));
733 }
734 }
735
736
737 /**
738 * @brief Send AddPartitionsToTxnRequest to the transaction coordinator.
739 *
740 * @returns an error code if the transaction coordinator is not known
741 * or not available.
742 *
743 * @locality rdkafka main thread
744 * @locks none
745 */
rd_kafka_txn_register_partitions(rd_kafka_t * rk)746 static rd_kafka_resp_err_t rd_kafka_txn_register_partitions (rd_kafka_t *rk) {
747 char errstr[512];
748 rd_kafka_resp_err_t err;
749 rd_kafka_error_t *error;
750 rd_kafka_pid_t pid;
751
752 mtx_lock(&rk->rk_eos.txn_pending_lock);
753 if (TAILQ_EMPTY(&rk->rk_eos.txn_pending_rktps)) {
754 mtx_unlock(&rk->rk_eos.txn_pending_lock);
755 return RD_KAFKA_RESP_ERR_NO_ERROR;
756 }
757
758 error = rd_kafka_txn_require_state(rk,
759 RD_KAFKA_TXN_STATE_IN_TRANSACTION,
760 RD_KAFKA_TXN_STATE_BEGIN_COMMIT);
761 if (error) {
762 err = rd_kafka_error_to_legacy(error, errstr, sizeof(errstr));
763 goto err;
764 }
765
766 pid = rd_kafka_idemp_get_pid0(rk, rd_false/*dont-lock*/);
767 if (!rd_kafka_pid_valid(pid)) {
768 rd_dassert(!*"BUG: No PID despite proper transaction state");
769 err = RD_KAFKA_RESP_ERR__STATE;
770 rd_snprintf(errstr, sizeof(errstr),
771 "No PID available (idempotence state %s)",
772 rd_kafka_idemp_state2str(rk->rk_eos.idemp_state));
773 goto err;
774 }
775
776 if (!rd_kafka_broker_is_up(rk->rk_eos.txn_coord)) {
777 err = RD_KAFKA_RESP_ERR__TRANSPORT;
778 rd_snprintf(errstr, sizeof(errstr), "Broker is not up");
779 goto err;
780 }
781
782
783 /* Send request to coordinator */
784 err = rd_kafka_AddPartitionsToTxnRequest(
785 rk->rk_eos.txn_coord,
786 rk->rk_conf.eos.transactional_id,
787 pid,
788 &rk->rk_eos.txn_pending_rktps,
789 errstr, sizeof(errstr),
790 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
791 rd_kafka_txn_handle_AddPartitionsToTxn, NULL);
792 if (err)
793 goto err;
794
795 TAILQ_CONCAT(&rk->rk_eos.txn_waitresp_rktps,
796 &rk->rk_eos.txn_pending_rktps,
797 rktp_txnlink);
798
799 mtx_unlock(&rk->rk_eos.txn_pending_lock);
800
801 rk->rk_eos.txn_req_cnt++;
802
803 rd_rkb_dbg(rk->rk_eos.txn_coord, EOS, "ADDPARTS",
804 "Adding partitions to transaction");
805
806 return RD_KAFKA_RESP_ERR_NO_ERROR;
807
808 err:
809 mtx_unlock(&rk->rk_eos.txn_pending_lock);
810
811 rd_kafka_dbg(rk, EOS, "ADDPARTS",
812 "Unable to register partitions with transaction: "
813 "%s", errstr);
814 return err;
815 }
816
rd_kafka_txn_register_partitions_tmr_cb(rd_kafka_timers_t * rkts,void * arg)817 static void rd_kafka_txn_register_partitions_tmr_cb (rd_kafka_timers_t *rkts,
818 void *arg) {
819 rd_kafka_t *rk = arg;
820
821 rd_kafka_txn_register_partitions(rk);
822 }
823
824
825 /**
826 * @brief Schedule register_partitions() as soon as possible.
827 *
828 * @locality any
829 * @locks any
830 */
rd_kafka_txn_schedule_register_partitions(rd_kafka_t * rk,int backoff_ms)831 void rd_kafka_txn_schedule_register_partitions (rd_kafka_t *rk,
832 int backoff_ms) {
833 rd_kafka_timer_start_oneshot(
834 &rk->rk_timers,
835 &rk->rk_eos.txn_register_parts_tmr, rd_false/*dont-restart*/,
836 backoff_ms ? backoff_ms * 1000 : 1 /* immediate */,
837 rd_kafka_txn_register_partitions_tmr_cb,
838 rk);
839 }
840
841
842
843 /**
844 * @brief Clears \p flag from all rktps in \p tqh
845 */
rd_kafka_txn_clear_partitions_flag(rd_kafka_toppar_tqhead_t * tqh,int flag)846 static void rd_kafka_txn_clear_partitions_flag (rd_kafka_toppar_tqhead_t *tqh,
847 int flag) {
848 rd_kafka_toppar_t *rktp;
849
850 TAILQ_FOREACH(rktp, tqh, rktp_txnlink) {
851 rd_kafka_toppar_lock(rktp);
852 rd_dassert(rktp->rktp_flags & flag);
853 rktp->rktp_flags &= ~flag;
854 rd_kafka_toppar_unlock(rktp);
855 }
856 }
857
858
859 /**
860 * @brief Clear all pending partitions.
861 *
862 * @locks txn_pending_lock MUST be held
863 */
rd_kafka_txn_clear_pending_partitions(rd_kafka_t * rk)864 static void rd_kafka_txn_clear_pending_partitions (rd_kafka_t *rk) {
865 rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_pending_rktps,
866 RD_KAFKA_TOPPAR_F_PEND_TXN);
867 rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_waitresp_rktps,
868 RD_KAFKA_TOPPAR_F_PEND_TXN);
869 TAILQ_INIT(&rk->rk_eos.txn_pending_rktps);
870 TAILQ_INIT(&rk->rk_eos.txn_waitresp_rktps);
871 }
872
873 /**
874 * @brief Clear all added partitions.
875 *
876 * @locks rd_kafka_wrlock(rk) MUST be held
877 */
rd_kafka_txn_clear_partitions(rd_kafka_t * rk)878 static void rd_kafka_txn_clear_partitions (rd_kafka_t *rk) {
879 rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_rktps,
880 RD_KAFKA_TOPPAR_F_IN_TXN);
881 TAILQ_INIT(&rk->rk_eos.txn_rktps);
882 }
883
884
885
886
887 /**
888 * @brief Op timeout callback which fails the current transaction.
889 *
890 * @locality rdkafka main thread
891 * @locks none
892 */
893 static void
rd_kafka_txn_curr_api_abort_timeout_cb(rd_kafka_timers_t * rkts,void * arg)894 rd_kafka_txn_curr_api_abort_timeout_cb (rd_kafka_timers_t *rkts, void *arg) {
895 rd_kafka_q_t *rkq = arg;
896
897 rd_kafka_txn_set_abortable_error(
898 rkts->rkts_rk,
899 RD_KAFKA_RESP_ERR__TIMED_OUT,
900 "Transactional operation timed out");
901
902 rd_kafka_txn_curr_api_reply_error(
903 rkq,
904 rd_kafka_error_new_txn_requires_abort(
905 RD_KAFKA_RESP_ERR__TIMED_OUT,
906 "Transactional operation timed out"));
907 }
908
909 /**
910 * @brief Op timeout callback which does not fail the current transaction,
911 * and sets the retriable flag on the error.
912 *
913 * @locality rdkafka main thread
914 * @locks none
915 */
916 static void
rd_kafka_txn_curr_api_retriable_timeout_cb(rd_kafka_timers_t * rkts,void * arg)917 rd_kafka_txn_curr_api_retriable_timeout_cb (rd_kafka_timers_t *rkts, void *arg) {
918 rd_kafka_q_t *rkq = arg;
919
920 rd_kafka_txn_curr_api_reply_error(
921 rkq,
922 rd_kafka_error_new_retriable(
923 RD_KAFKA_RESP_ERR__TIMED_OUT,
924 "Transactional operation timed out"));
925 }
926
927
928 /**
929 * @brief Op timeout callback which does not fail the current transaction.
930 *
931 * @locality rdkafka main thread
932 * @locks none
933 */
934 static void
rd_kafka_txn_curr_api_timeout_cb(rd_kafka_timers_t * rkts,void * arg)935 rd_kafka_txn_curr_api_timeout_cb (rd_kafka_timers_t *rkts, void *arg) {
936 rd_kafka_q_t *rkq = arg;
937
938 rd_kafka_txn_curr_api_reply(rkq, RD_KAFKA_RESP_ERR__TIMED_OUT,
939 "Transactional operation timed out");
940 }
941
942 /**
943 * @brief Op timeout callback for init_transactions() that uses the
944 * the last txn_init_err as error code.
945 *
946 * @locality rdkafka main thread
947 * @locks none
948 */
949 static void
rd_kafka_txn_curr_api_init_timeout_cb(rd_kafka_timers_t * rkts,void * arg)950 rd_kafka_txn_curr_api_init_timeout_cb (rd_kafka_timers_t *rkts, void *arg) {
951 rd_kafka_q_t *rkq = arg;
952 rd_kafka_error_t *error;
953 rd_kafka_resp_err_t err = rkts->rkts_rk->rk_eos.txn_init_err;
954
955 if (!err)
956 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
957
958 error = rd_kafka_error_new(err,
959 "Failed to initialize Producer ID: %s",
960 rd_kafka_err2str(err));
961
962 /* init_transactions() timeouts are retriable */
963 if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
964 rd_kafka_error_set_retriable(error);
965
966 rd_kafka_txn_curr_api_reply_error(rkq, error);
967 }
968
969
970
971 /**
972 * @brief Reset the current API, typically because it was completed
973 * without timeout.
974 *
975 * @locality rdkafka main thread
976 * @locks rd_kafka_wrlock(rk) MUST be held
977 */
rd_kafka_txn_curr_api_reset(rd_kafka_t * rk)978 static void rd_kafka_txn_curr_api_reset (rd_kafka_t *rk) {
979 rd_bool_t timer_was_stopped;
980 rd_kafka_q_t *rkq;
981
982 rkq = rk->rk_eos.txn_curr_api.tmr.rtmr_arg;
983 timer_was_stopped = rd_kafka_timer_stop(
984 &rk->rk_timers,
985 &rk->rk_eos.txn_curr_api.tmr,
986 RD_DO_LOCK);
987
988 if (rkq && timer_was_stopped) {
989 /* Remove the stopped timer's reply queue reference
990 * since the timer callback will not have fired if
991 * we stopped the timer. */
992 rd_kafka_q_destroy(rkq);
993 }
994
995 *rk->rk_eos.txn_curr_api.name = '\0';
996 rk->rk_eos.txn_curr_api.flags = 0;
997 }
998
999
1000 /**
1001 * @brief Sets the current API op (representing a blocking application API call)
1002 * and a timeout for the same, and sends the op to the transaction
1003 * manager thread (rdkafka main thread) for processing.
1004 *
1005 * If the timeout expires the rko will fail with ERR__TIMED_OUT
1006 * and the txnmgr state will be adjusted according to \p abort_on_timeout:
1007 * if true, the txn will transition to ABORTABLE_ERROR, else remain in
1008 * the current state.
1009 *
1010 * This call will block until a response is received from the rdkafka
1011 * main thread.
1012 *
1013 * Use rd_kafka_txn_curr_api_reset() when operation finishes prior
1014 * to the timeout.
1015 *
1016 * @param rko Op to send to txnmgr, or NULL if no op to send (yet).
1017 * @param flags See RD_KAFKA_TXN_CURR_API_F_.. flags in rdkafka_int.h.
1018 *
1019 * @returns an error, or NULL on success.
1020 *
1021 * @locality application thread
1022 * @locks none
1023 */
1024 static rd_kafka_error_t *
rd_kafka_txn_curr_api_req(rd_kafka_t * rk,const char * name,rd_kafka_op_t * rko,int timeout_ms,int flags)1025 rd_kafka_txn_curr_api_req (rd_kafka_t *rk, const char *name,
1026 rd_kafka_op_t *rko,
1027 int timeout_ms, int flags) {
1028 rd_kafka_op_t *reply;
1029 rd_bool_t reuse = rd_false;
1030 rd_bool_t for_reuse;
1031 rd_kafka_q_t *tmpq = NULL;
1032 rd_kafka_error_t *error = NULL;
1033
1034 /* Strip __FUNCTION__ name's rd_kafka_ prefix since it will
1035 * not make sense in high-level language bindings. */
1036 if (!strncmp(name, "rd_kafka_", strlen("rd_kafka_")))
1037 name += strlen("rd_kafka_");
1038
1039 rd_kafka_dbg(rk, EOS, "TXNAPI", "Transactional API called: %s", name);
1040
1041 if (flags & RD_KAFKA_TXN_CURR_API_F_REUSE) {
1042 /* Reuse the current API call state. */
1043 flags &= ~RD_KAFKA_TXN_CURR_API_F_REUSE;
1044 reuse = rd_true;
1045 }
1046
1047 rd_kafka_wrlock(rk);
1048
1049 /* First set for_reuse to the current flags to match with
1050 * the passed flags. */
1051 for_reuse = !!(rk->rk_eos.txn_curr_api.flags &
1052 RD_KAFKA_TXN_CURR_API_F_FOR_REUSE);
1053
1054 if ((for_reuse && !reuse) ||
1055 (!for_reuse && *rk->rk_eos.txn_curr_api.name)) {
1056 error = rd_kafka_error_new(
1057 RD_KAFKA_RESP_ERR__STATE,
1058 "Conflicting %s call already in progress",
1059 rk->rk_eos.txn_curr_api.name);
1060 rd_kafka_wrunlock(rk);
1061 if (rko)
1062 rd_kafka_op_destroy(rko);
1063 return error;
1064 }
1065
1066 rd_assert(for_reuse == reuse);
1067
1068 rd_snprintf(rk->rk_eos.txn_curr_api.name,
1069 sizeof(rk->rk_eos.txn_curr_api.name),
1070 "%s", name);
1071
1072 if (rko)
1073 tmpq = rd_kafka_q_new(rk);
1074
1075 rk->rk_eos.txn_curr_api.flags |= flags;
1076
1077 /* Then update for_reuse to the passed flags so that
1078 * api_reset() will not reset curr APIs that are to be reused,
1079 * but a sub-sequent _F_REUSE call will reset it. */
1080 for_reuse = !!(flags & RD_KAFKA_TXN_CURR_API_F_FOR_REUSE);
1081
1082 /* If no timeout has been specified, use the transaction.timeout.ms */
1083 if (timeout_ms < 0)
1084 timeout_ms = rk->rk_conf.eos.transaction_timeout_ms;
1085
1086 if (!reuse && timeout_ms >= 0) {
1087 rd_kafka_q_keep(tmpq);
1088 rd_kafka_timer_start_oneshot(
1089 &rk->rk_timers,
1090 &rk->rk_eos.txn_curr_api.tmr,
1091 rd_false,
1092 timeout_ms * 1000,
1093 !strcmp(name, "init_transactions") ?
1094 rd_kafka_txn_curr_api_init_timeout_cb :
1095 (flags & RD_KAFKA_TXN_CURR_API_F_ABORT_ON_TIMEOUT ?
1096 rd_kafka_txn_curr_api_abort_timeout_cb :
1097 (flags & RD_KAFKA_TXN_CURR_API_F_RETRIABLE_ON_TIMEOUT ?
1098 rd_kafka_txn_curr_api_retriable_timeout_cb :
1099 rd_kafka_txn_curr_api_timeout_cb)),
1100 tmpq);
1101 }
1102 rd_kafka_wrunlock(rk);
1103
1104 if (!rko)
1105 return NULL;
1106
1107 /* Send op to rdkafka main thread and wait for reply */
1108 reply = rd_kafka_op_req0(rk->rk_ops, tmpq, rko, RD_POLL_INFINITE);
1109
1110 rd_kafka_q_destroy_owner(tmpq);
1111
1112 if ((error = reply->rko_u.txn.error)) {
1113 reply->rko_u.txn.error = NULL;
1114 for_reuse = rd_false;
1115 }
1116
1117 rd_kafka_op_destroy(reply);
1118
1119 if (!for_reuse)
1120 rd_kafka_txn_curr_api_reset(rk);
1121
1122 return error;
1123 }
1124
1125
1126 /**
1127 * @brief Async handler for init_transactions()
1128 *
1129 * @locks none
1130 * @locality rdkafka main thread
1131 */
1132 static rd_kafka_op_res_t
rd_kafka_txn_op_init_transactions(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1133 rd_kafka_txn_op_init_transactions (rd_kafka_t *rk,
1134 rd_kafka_q_t *rkq,
1135 rd_kafka_op_t *rko) {
1136 rd_kafka_error_t *error;
1137
1138 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1139 return RD_KAFKA_OP_RES_HANDLED;
1140
1141 rd_kafka_wrlock(rk);
1142 if ((error = rd_kafka_txn_require_state(
1143 rk,
1144 RD_KAFKA_TXN_STATE_INIT,
1145 RD_KAFKA_TXN_STATE_WAIT_PID,
1146 RD_KAFKA_TXN_STATE_READY_NOT_ACKED))) {
1147 rd_kafka_wrunlock(rk);
1148 goto done;
1149 }
1150
1151 if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_READY_NOT_ACKED) {
1152 /* A previous init_transactions() called finished successfully
1153 * after timeout, the application has called init_transactions()
1154 * again, we do nothin here, ack_init_transactions() will
1155 * transition the state from READY_NOT_ACKED to READY. */
1156 rd_kafka_wrunlock(rk);
1157 goto done;
1158 }
1159
1160 /* Possibly a no-op if already in WAIT_PID state */
1161 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_WAIT_PID);
1162
1163 /* Destroy previous reply queue for a previously timed out
1164 * init_transactions() call. */
1165 if (rk->rk_eos.txn_init_rkq)
1166 rd_kafka_q_destroy(rk->rk_eos.txn_init_rkq);
1167
1168 /* Grab a separate reference to use in state_change(),
1169 * outside the curr_api to allow the curr_api to timeout while
1170 * the background init continues. */
1171 rk->rk_eos.txn_init_rkq = rd_kafka_q_keep(rko->rko_replyq.q);
1172
1173 rd_kafka_wrunlock(rk);
1174
1175 rk->rk_eos.txn_init_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1176
1177 /* Start idempotent producer to acquire PID */
1178 rd_kafka_idemp_start(rk, rd_true/*immediately*/);
1179
1180 return RD_KAFKA_OP_RES_HANDLED;
1181
1182 done:
1183 rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
1184 error);
1185
1186 return RD_KAFKA_OP_RES_HANDLED;
1187 }
1188
1189
1190 /**
1191 * @brief Async handler for the application to acknowledge
1192 * successful background completion of init_transactions().
1193 *
1194 * @locks none
1195 * @locality rdkafka main thread
1196 */
1197 static rd_kafka_op_res_t
rd_kafka_txn_op_ack_init_transactions(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1198 rd_kafka_txn_op_ack_init_transactions (rd_kafka_t *rk,
1199 rd_kafka_q_t *rkq,
1200 rd_kafka_op_t *rko) {
1201 rd_kafka_error_t *error;
1202
1203 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1204 return RD_KAFKA_OP_RES_HANDLED;
1205
1206 rd_kafka_wrlock(rk);
1207 if ((error = rd_kafka_txn_require_state(
1208 rk,
1209 RD_KAFKA_TXN_STATE_READY_NOT_ACKED))) {
1210 rd_kafka_wrunlock(rk);
1211 goto done;
1212 }
1213
1214 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY);
1215
1216 rd_kafka_wrunlock(rk);
1217 /* FALLTHRU */
1218
1219 done:
1220 rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
1221 error);
1222
1223 return RD_KAFKA_OP_RES_HANDLED;
1224 }
1225
1226
1227
1228 rd_kafka_error_t *
rd_kafka_init_transactions(rd_kafka_t * rk,int timeout_ms)1229 rd_kafka_init_transactions (rd_kafka_t *rk, int timeout_ms) {
1230 rd_kafka_error_t *error;
1231
1232 if ((error = rd_kafka_ensure_transactional(rk)))
1233 return error;
1234
1235 /* init_transactions() will continue to operate in the background
1236 * if the timeout expires, and the application may call
1237 * init_transactions() again to "continue" with the initialization
1238 * process.
1239 * For this reason we need two states:
1240 * - TXN_STATE_READY_NOT_ACKED for when initialization is done
1241 * but the API call timed out prior to success, meaning the
1242 * application does not know initialization finished and
1243 * is thus not allowed to call sub-sequent txn APIs, e.g. begin..()
1244 * - TXN_STATE_READY for when initialization is done and this
1245 * function has returned successfully to the application.
1246 *
1247 * And due to the two states we need two calls to the rdkafka main
1248 * thread (to keep txn_state synchronization in one place). */
1249
1250 /* First call is to trigger initialization */
1251 error = rd_kafka_txn_curr_api_req(
1252 rk, __FUNCTION__,
1253 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
1254 rd_kafka_txn_op_init_transactions),
1255 timeout_ms,
1256 RD_KAFKA_TXN_CURR_API_F_RETRIABLE_ON_TIMEOUT|
1257 RD_KAFKA_TXN_CURR_API_F_FOR_REUSE);
1258 if (error)
1259 return error;
1260
1261
1262 /* Second call is to transition from READY_NOT_ACKED -> READY,
1263 * if necessary. */
1264 return rd_kafka_txn_curr_api_req(
1265 rk, __FUNCTION__,
1266 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
1267 rd_kafka_txn_op_ack_init_transactions),
1268 RD_POLL_INFINITE, /* immediate, no timeout needed */
1269 RD_KAFKA_TXN_CURR_API_F_REUSE);
1270 }
1271
1272
1273
1274 /**
1275 * @brief Handler for begin_transaction()
1276 *
1277 * @locks none
1278 * @locality rdkafka main thread
1279 */
1280 static rd_kafka_op_res_t
rd_kafka_txn_op_begin_transaction(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1281 rd_kafka_txn_op_begin_transaction (rd_kafka_t *rk,
1282 rd_kafka_q_t *rkq,
1283 rd_kafka_op_t *rko) {
1284 rd_kafka_error_t *error;
1285 rd_bool_t wakeup_brokers = rd_false;
1286
1287 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1288 return RD_KAFKA_OP_RES_HANDLED;
1289
1290 rd_kafka_wrlock(rk);
1291 if (!(error = rd_kafka_txn_require_state(rk,
1292 RD_KAFKA_TXN_STATE_READY))) {
1293 rd_assert(TAILQ_EMPTY(&rk->rk_eos.txn_rktps));
1294
1295 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION);
1296
1297 rk->rk_eos.txn_req_cnt = 0;
1298 rk->rk_eos.txn_err = RD_KAFKA_RESP_ERR_NO_ERROR;
1299 RD_IF_FREE(rk->rk_eos.txn_errstr, rd_free);
1300 rk->rk_eos.txn_errstr = NULL;
1301
1302 /* Wake up all broker threads (that may have messages to send
1303 * that were waiting for this transaction state.
1304 * But needs to be done below with no lock held. */
1305 wakeup_brokers = rd_true;
1306
1307 }
1308 rd_kafka_wrunlock(rk);
1309
1310 if (wakeup_brokers)
1311 rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT);
1312
1313 rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
1314 error);
1315
1316 return RD_KAFKA_OP_RES_HANDLED;
1317 }
1318
1319
rd_kafka_begin_transaction(rd_kafka_t * rk)1320 rd_kafka_error_t *rd_kafka_begin_transaction (rd_kafka_t *rk) {
1321 rd_kafka_op_t *reply;
1322 rd_kafka_error_t *error;
1323
1324 if ((error = rd_kafka_ensure_transactional(rk)))
1325 return error;
1326
1327 reply = rd_kafka_op_req(
1328 rk->rk_ops,
1329 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
1330 rd_kafka_txn_op_begin_transaction),
1331 RD_POLL_INFINITE);
1332
1333 if ((error = reply->rko_u.txn.error))
1334 reply->rko_u.txn.error = NULL;
1335
1336 rd_kafka_op_destroy(reply);
1337
1338 return error;
1339 }
1340
1341
1342 static rd_kafka_resp_err_t
1343 rd_kafka_txn_send_TxnOffsetCommitRequest (rd_kafka_broker_t *rkb,
1344 rd_kafka_op_t *rko,
1345 rd_kafka_replyq_t replyq,
1346 rd_kafka_resp_cb_t *resp_cb,
1347 void *reply_opaque);
1348
1349 /**
1350 * @brief Handle TxnOffsetCommitResponse
1351 *
1352 * @locality rdkafka main thread
1353 * @locks none
1354 */
rd_kafka_txn_handle_TxnOffsetCommit(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1355 static void rd_kafka_txn_handle_TxnOffsetCommit (rd_kafka_t *rk,
1356 rd_kafka_broker_t *rkb,
1357 rd_kafka_resp_err_t err,
1358 rd_kafka_buf_t *rkbuf,
1359 rd_kafka_buf_t *request,
1360 void *opaque) {
1361 const int log_decode_errors = LOG_ERR;
1362 rd_kafka_op_t *rko = opaque;
1363 int actions = 0;
1364 rd_kafka_topic_partition_list_t *partitions = NULL;
1365 char errstr[512];
1366
1367 *errstr = '\0';
1368
1369 if (err != RD_KAFKA_RESP_ERR__DESTROY &&
1370 !rd_kafka_q_ready(rko->rko_replyq.q))
1371 err = RD_KAFKA_RESP_ERR__OUTDATED;
1372
1373 if (err)
1374 goto done;
1375
1376 rd_kafka_buf_read_throttle_time(rkbuf);
1377
1378 partitions = rd_kafka_buf_read_topic_partitions(rkbuf, 0);
1379 if (!partitions)
1380 goto err_parse;
1381
1382 err = rd_kafka_topic_partition_list_get_err(partitions);
1383 if (err) {
1384 char errparts[256];
1385 rd_kafka_topic_partition_list_str(partitions,
1386 errparts, sizeof(errparts),
1387 RD_KAFKA_FMT_F_ONLY_ERR);
1388 rd_snprintf(errstr, sizeof(errstr),
1389 "Failed to commit offsets to transaction on "
1390 "broker %s: %s "
1391 "(after %dms)",
1392 rd_kafka_broker_name(rkb),
1393 errparts, (int)(rkbuf->rkbuf_ts_sent/1000));
1394 }
1395
1396 goto done;
1397
1398 err_parse:
1399 err = rkbuf->rkbuf_err;
1400
1401 done:
1402 if (err) {
1403 rk->rk_eos.txn_req_cnt--;
1404
1405 if (!*errstr) {
1406 rd_snprintf(errstr, sizeof(errstr),
1407 "Failed to commit offsets to "
1408 "transaction on broker %s: %s "
1409 "(after %d ms)",
1410 rd_kafka_broker_name(rkb),
1411 rd_kafka_err2str(err),
1412 (int)(rkbuf->rkbuf_ts_sent/1000));
1413 }
1414 }
1415
1416
1417 if (partitions)
1418 rd_kafka_topic_partition_list_destroy(partitions);
1419
1420 switch (err)
1421 {
1422 case RD_KAFKA_RESP_ERR_NO_ERROR:
1423 break;
1424
1425 case RD_KAFKA_RESP_ERR__DESTROY:
1426 case RD_KAFKA_RESP_ERR__OUTDATED:
1427 rd_kafka_op_destroy(rko);
1428 return;
1429
1430 case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
1431 case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
1432 case RD_KAFKA_RESP_ERR__TRANSPORT:
1433 /* Note: this is the group coordinator, not the
1434 * transaction coordinator. */
1435 rd_kafka_coord_cache_evict(&rk->rk_coord_cache, rkb);
1436 actions |= RD_KAFKA_ERR_ACTION_RETRY;
1437 break;
1438
1439 case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS:
1440 case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS:
1441 case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
1442 actions |= RD_KAFKA_ERR_ACTION_RETRY;
1443 break;
1444
1445 case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
1446 case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING:
1447 case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
1448 case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
1449 case RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT:
1450 actions |= RD_KAFKA_ERR_ACTION_FATAL;
1451 break;
1452
1453 case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED:
1454 case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED:
1455 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1456 break;
1457
1458 default:
1459 /* Unhandled error, fail transaction */
1460 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1461 break;
1462 }
1463
1464 if (actions & RD_KAFKA_ERR_ACTION_FATAL) {
1465 rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err,
1466 "%s", errstr);
1467
1468 } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
1469 int remains_ms = rd_timeout_remains(rko->rko_u.txn.abs_timeout);
1470
1471 if (!rd_timeout_expired(remains_ms)) {
1472 rd_kafka_coord_req(
1473 rk,
1474 RD_KAFKA_COORD_GROUP,
1475 rko->rko_u.txn.group_id,
1476 rd_kafka_txn_send_TxnOffsetCommitRequest,
1477 rko,
1478 rd_timeout_remains_limit0(
1479 remains_ms,
1480 rk->rk_conf.socket_timeout_ms),
1481 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1482 rd_kafka_txn_handle_TxnOffsetCommit,
1483 rko);
1484 return;
1485 } else if (!err)
1486 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
1487 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1488 }
1489
1490 if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
1491 rd_kafka_txn_set_abortable_error(rk, err, "%s", errstr);
1492
1493 if (err)
1494 rd_kafka_txn_curr_api_reply(rd_kafka_q_keep(rko->rko_replyq.q),
1495 err, "%s", errstr);
1496 else
1497 rd_kafka_txn_curr_api_reply(rd_kafka_q_keep(rko->rko_replyq.q),
1498 RD_KAFKA_RESP_ERR_NO_ERROR, NULL);
1499
1500 rd_kafka_op_destroy(rko);
1501 }
1502
1503
1504
1505 /**
1506 * @brief Construct and send TxnOffsetCommitRequest.
1507 *
1508 * @locality rdkafka main thread
1509 * @locks none
1510 */
1511 static rd_kafka_resp_err_t
rd_kafka_txn_send_TxnOffsetCommitRequest(rd_kafka_broker_t * rkb,rd_kafka_op_t * rko,rd_kafka_replyq_t replyq,rd_kafka_resp_cb_t * resp_cb,void * reply_opaque)1512 rd_kafka_txn_send_TxnOffsetCommitRequest (rd_kafka_broker_t *rkb,
1513 rd_kafka_op_t *rko,
1514 rd_kafka_replyq_t replyq,
1515 rd_kafka_resp_cb_t *resp_cb,
1516 void *reply_opaque) {
1517 rd_kafka_t *rk = rkb->rkb_rk;
1518 rd_kafka_buf_t *rkbuf;
1519 int16_t ApiVersion;
1520 rd_kafka_pid_t pid;
1521 int cnt;
1522
1523 rd_kafka_rdlock(rk);
1524 if (rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_IN_TRANSACTION) {
1525 rd_kafka_rdunlock(rk);
1526 rd_kafka_op_destroy(rko);
1527 return RD_KAFKA_RESP_ERR__OUTDATED;
1528 }
1529
1530 pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK);
1531 rd_kafka_rdunlock(rk);
1532 if (!rd_kafka_pid_valid(pid)) {
1533 rd_kafka_op_destroy(rko);
1534 return RD_KAFKA_RESP_ERR__STATE;
1535 }
1536
1537 ApiVersion = rd_kafka_broker_ApiVersion_supported(
1538 rkb, RD_KAFKAP_TxnOffsetCommit, 0, 0, NULL);
1539 if (ApiVersion == -1) {
1540 rd_kafka_op_destroy(rko);
1541 return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
1542 }
1543
1544 rkbuf = rd_kafka_buf_new_request(rkb,
1545 RD_KAFKAP_TxnOffsetCommit, 1,
1546 rko->rko_u.txn.offsets->cnt * 50);
1547
1548 /* transactional_id */
1549 rd_kafka_buf_write_str(rkbuf, rk->rk_conf.eos.transactional_id, -1);
1550
1551 /* group_id */
1552 rd_kafka_buf_write_str(rkbuf, rko->rko_u.txn.group_id, -1);
1553
1554 /* PID */
1555 rd_kafka_buf_write_i64(rkbuf, pid.id);
1556 rd_kafka_buf_write_i16(rkbuf, pid.epoch);
1557
1558 /* Write per-partition offsets list */
1559 cnt = rd_kafka_buf_write_topic_partitions(
1560 rkbuf,
1561 rko->rko_u.txn.offsets,
1562 rd_true /*skip invalid offsets*/,
1563 rd_false/*dont write Epoch*/,
1564 rd_true /*write Metadata*/);
1565
1566 if (!cnt) {
1567 /* No valid partition offsets, don't commit. */
1568 rd_kafka_buf_destroy(rkbuf);
1569 rd_kafka_op_destroy(rko);
1570 return RD_KAFKA_RESP_ERR__NO_OFFSET;
1571 }
1572
1573 rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
1574
1575 rkbuf->rkbuf_max_retries = 3;
1576
1577 rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
1578 replyq, resp_cb, reply_opaque);
1579
1580 return RD_KAFKA_RESP_ERR_NO_ERROR;
1581 }
1582
1583
1584 /**
1585 * @brief Handle AddOffsetsToTxnResponse
1586 *
1587 * @locality rdkafka main thread
1588 * @locks none
1589 */
rd_kafka_txn_handle_AddOffsetsToTxn(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1590 static void rd_kafka_txn_handle_AddOffsetsToTxn (rd_kafka_t *rk,
1591 rd_kafka_broker_t *rkb,
1592 rd_kafka_resp_err_t err,
1593 rd_kafka_buf_t *rkbuf,
1594 rd_kafka_buf_t *request,
1595 void *opaque) {
1596 const int log_decode_errors = LOG_ERR;
1597 rd_kafka_op_t *rko = opaque;
1598 int16_t ErrorCode;
1599 int actions = 0;
1600 int remains_ms;
1601
1602 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
1603 rd_kafka_op_destroy(rko);
1604 return;
1605 }
1606
1607 if (!rd_kafka_q_ready(rko->rko_replyq.q))
1608 err = RD_KAFKA_RESP_ERR__OUTDATED;
1609
1610 if (err)
1611 goto done;
1612
1613 rd_kafka_buf_read_throttle_time(rkbuf);
1614 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1615
1616 err = ErrorCode;
1617 goto done;
1618
1619 err_parse:
1620 err = rkbuf->rkbuf_err;
1621
1622 done:
1623 if (err)
1624 rk->rk_eos.txn_req_cnt--;
1625
1626 remains_ms = rd_timeout_remains(rko->rko_u.txn.abs_timeout);
1627
1628 if (rd_timeout_expired(remains_ms) && !err)
1629 err = RD_KAFKA_RESP_ERR__TIMED_OUT;
1630
1631 switch (err)
1632 {
1633 case RD_KAFKA_RESP_ERR_NO_ERROR:
1634 break;
1635
1636 case RD_KAFKA_RESP_ERR__OUTDATED:
1637 case RD_KAFKA_RESP_ERR__DESTROY:
1638 /* Producer is being terminated, ignore the response. */
1639 break;
1640
1641 case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
1642 case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
1643 case RD_KAFKA_RESP_ERR__TRANSPORT:
1644 case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
1645 actions |= RD_KAFKA_ERR_ACTION_RETRY|
1646 RD_KAFKA_ERR_ACTION_REFRESH;
1647 break;
1648
1649 case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
1650 case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
1651 case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
1652 case RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT:
1653 actions |= RD_KAFKA_ERR_ACTION_FATAL;
1654 break;
1655
1656 case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED:
1657 case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED:
1658 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1659 break;
1660
1661 case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
1662 case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS:
1663 case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS:
1664 actions |= RD_KAFKA_ERR_ACTION_RETRY;
1665 break;
1666
1667 default:
1668 /* All unhandled errors are permanent */
1669 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1670 break;
1671 }
1672
1673
1674 /* All unhandled errors are considered permanent */
1675 if (err && !actions)
1676 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1677
1678 if (actions & RD_KAFKA_ERR_ACTION_FATAL) {
1679 rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err,
1680 "Failed to add offsets to "
1681 "transaction: %s",
1682 rd_kafka_err2str(err));
1683
1684 } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
1685 if (!rd_timeout_expired(remains_ms) &&
1686 rd_kafka_buf_retry(rk->rk_eos.txn_coord, request))
1687 return;
1688 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1689
1690 } else if (err) {
1691 rd_rkb_log(rkb, LOG_ERR, "ADDOFFSETS",
1692 "Failed to add offsets to transaction: %s",
1693 rd_kafka_err2str(err));
1694 }
1695
1696 if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
1697 rd_kafka_txn_set_abortable_error(
1698 rk, err,
1699 "Failed to add offsets to "
1700 "transaction on broker %s: "
1701 "%s (after %dms)",
1702 rd_kafka_broker_name(rkb),
1703 rd_kafka_err2str(err),
1704 (int)(rkbuf->rkbuf_ts_sent/1000));
1705
1706 if (!err) {
1707 /* Step 2: Commit offsets to transaction on the
1708 * group coordinator. */
1709
1710 rd_kafka_coord_req(rk,
1711 RD_KAFKA_COORD_GROUP,
1712 rko->rko_u.txn.group_id,
1713 rd_kafka_txn_send_TxnOffsetCommitRequest,
1714 rko,
1715 rd_timeout_remains_limit0(
1716 remains_ms,
1717 rk->rk_conf.socket_timeout_ms),
1718 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1719 rd_kafka_txn_handle_TxnOffsetCommit,
1720 rko);
1721
1722 } else {
1723
1724 rd_kafka_txn_curr_api_reply(
1725 rd_kafka_q_keep(rko->rko_replyq.q), err,
1726 "Failed to add offsets to transaction on broker %s: "
1727 "%s (after %dms)",
1728 rd_kafka_broker_name(rkb),
1729 rd_kafka_err2str(err),
1730 (int)(rkbuf->rkbuf_ts_sent/1000));
1731
1732 rd_kafka_op_destroy(rko);
1733 }
1734 }
1735
1736
1737 /**
1738 * @brief Async handler for send_offsets_to_transaction()
1739 *
1740 * @locks none
1741 * @locality rdkafka main thread
1742 */
1743 static rd_kafka_op_res_t
rd_kafka_txn_op_send_offsets_to_transaction(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1744 rd_kafka_txn_op_send_offsets_to_transaction (rd_kafka_t *rk,
1745 rd_kafka_q_t *rkq,
1746 rd_kafka_op_t *rko) {
1747 rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
1748 char errstr[512];
1749 rd_kafka_error_t *error;
1750 rd_kafka_pid_t pid;
1751
1752 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
1753 return RD_KAFKA_OP_RES_HANDLED;
1754
1755 *errstr = '\0';
1756
1757 rd_kafka_wrlock(rk);
1758
1759 if ((error = rd_kafka_txn_require_state(
1760 rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION))) {
1761 rd_kafka_wrunlock(rk);
1762 goto err;
1763 }
1764
1765 rd_kafka_wrunlock(rk);
1766
1767 pid = rd_kafka_idemp_get_pid0(rk, rd_false/*dont-lock*/);
1768 if (!rd_kafka_pid_valid(pid)) {
1769 rd_dassert(!*"BUG: No PID despite proper transaction state");
1770 error = rd_kafka_error_new_retriable(
1771 RD_KAFKA_RESP_ERR__STATE,
1772 "No PID available (idempotence state %s)",
1773 rd_kafka_idemp_state2str(rk->rk_eos.idemp_state));
1774 goto err;
1775 }
1776
1777 /* This is a multi-stage operation, consisting of:
1778 * 1) send AddOffsetsToTxnRequest to transaction coordinator.
1779 * 2) send TxnOffsetCommitRequest to group coordinator. */
1780
1781 err = rd_kafka_AddOffsetsToTxnRequest(
1782 rk->rk_eos.txn_coord,
1783 rk->rk_conf.eos.transactional_id,
1784 pid,
1785 rko->rko_u.txn.group_id,
1786 errstr, sizeof(errstr),
1787 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
1788 rd_kafka_txn_handle_AddOffsetsToTxn,
1789 rko);
1790
1791 if (err) {
1792 error = rd_kafka_error_new_retriable(err, "%s", errstr);
1793 goto err;
1794 }
1795
1796 return RD_KAFKA_OP_RES_KEEP; /* the rko is passed to AddOffsetsToTxn */
1797
1798 err:
1799 rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
1800 error);
1801
1802 return RD_KAFKA_OP_RES_HANDLED;
1803 }
1804
1805 /**
1806 * error returns:
1807 * ERR__TRANSPORT - retryable
1808 */
1809 rd_kafka_error_t *
rd_kafka_send_offsets_to_transaction(rd_kafka_t * rk,const rd_kafka_topic_partition_list_t * offsets,const rd_kafka_consumer_group_metadata_t * cgmetadata,int timeout_ms)1810 rd_kafka_send_offsets_to_transaction (
1811 rd_kafka_t *rk,
1812 const rd_kafka_topic_partition_list_t *offsets,
1813 const rd_kafka_consumer_group_metadata_t *cgmetadata,
1814 int timeout_ms) {
1815 rd_kafka_error_t *error;
1816 rd_kafka_op_t *rko;
1817 rd_kafka_topic_partition_list_t *valid_offsets;
1818
1819 if ((error = rd_kafka_ensure_transactional(rk)))
1820 return error;
1821
1822 if (!cgmetadata || !offsets)
1823 return rd_kafka_error_new(
1824 RD_KAFKA_RESP_ERR__INVALID_ARG,
1825 "cgmetadata and offsets are required parameters");
1826
1827 valid_offsets = rd_kafka_topic_partition_list_match(
1828 offsets, rd_kafka_topic_partition_match_valid_offset, NULL);
1829
1830 if (valid_offsets->cnt == 0) {
1831 /* No valid offsets, e.g., nothing was consumed,
1832 * this is not an error, do nothing. */
1833 rd_kafka_topic_partition_list_destroy(valid_offsets);
1834 return NULL;
1835 }
1836
1837 rd_kafka_topic_partition_list_sort_by_topic(valid_offsets);
1838
1839 rko = rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
1840 rd_kafka_txn_op_send_offsets_to_transaction);
1841 rko->rko_u.txn.offsets = valid_offsets;
1842 rko->rko_u.txn.group_id = rd_strdup(cgmetadata->group_id);
1843 if (timeout_ms > rk->rk_conf.eos.transaction_timeout_ms)
1844 timeout_ms = rk->rk_conf.eos.transaction_timeout_ms;
1845 rko->rko_u.txn.abs_timeout = rd_timeout_init(timeout_ms);
1846
1847 return rd_kafka_txn_curr_api_req(
1848 rk, __FUNCTION__, rko,
1849 RD_POLL_INFINITE, /* rely on background code to time out */
1850 0 /* no flags */);
1851 }
1852
1853
1854
1855
1856
1857 /**
1858 * @brief Successfully complete the transaction.
1859 *
1860 * @locality rdkafka main thread
1861 * @locks rd_kafka_wrlock(rk) MUST be held
1862 */
rd_kafka_txn_complete(rd_kafka_t * rk)1863 static void rd_kafka_txn_complete (rd_kafka_t *rk) {
1864
1865 rd_kafka_dbg(rk, EOS, "TXNCOMPLETE",
1866 "Transaction successfully %s",
1867 rk->rk_eos.txn_state ==
1868 RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION ?
1869 "committed" : "aborted");
1870
1871 /* Clear all transaction partition state */
1872 rd_kafka_txn_clear_pending_partitions(rk);
1873 rd_kafka_txn_clear_partitions(rk);
1874
1875 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY);
1876 }
1877
1878
1879
1880 /**
1881 * @brief Handle EndTxnResponse (commit or abort)
1882 *
1883 * @locality rdkafka main thread
1884 * @locks none
1885 */
rd_kafka_txn_handle_EndTxn(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)1886 static void rd_kafka_txn_handle_EndTxn (rd_kafka_t *rk,
1887 rd_kafka_broker_t *rkb,
1888 rd_kafka_resp_err_t err,
1889 rd_kafka_buf_t *rkbuf,
1890 rd_kafka_buf_t *request,
1891 void *opaque) {
1892 const int log_decode_errors = LOG_ERR;
1893 rd_kafka_q_t *rkq = opaque;
1894 int16_t ErrorCode;
1895 int actions = 0;
1896 rd_bool_t is_commit = rd_false;
1897
1898 if (err == RD_KAFKA_RESP_ERR__DESTROY) {
1899 rd_kafka_q_destroy(rkq);
1900 return;
1901 }
1902
1903 if (err)
1904 goto err;
1905
1906 rd_kafka_buf_read_throttle_time(rkbuf);
1907 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
1908
1909 err = ErrorCode;
1910 /* FALLTHRU */
1911
1912 err_parse:
1913 err = rkbuf->rkbuf_err;
1914 err:
1915 rd_kafka_wrlock(rk);
1916 if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION)
1917 is_commit = rd_true;
1918 else if (rk->rk_eos.txn_state ==
1919 RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION)
1920 is_commit = rd_false;
1921 else
1922 err = RD_KAFKA_RESP_ERR__OUTDATED;
1923 rd_kafka_wrunlock(rk);
1924
1925 switch (err)
1926 {
1927 case RD_KAFKA_RESP_ERR_NO_ERROR:
1928 /* EndTxn successful: complete the transaction */
1929 rd_kafka_txn_complete(rk);
1930 break;
1931
1932 case RD_KAFKA_RESP_ERR__OUTDATED:
1933 case RD_KAFKA_RESP_ERR__DESTROY:
1934 /* Producer is being terminated, ignore the response. */
1935 break;
1936
1937 case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
1938 case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
1939 case RD_KAFKA_RESP_ERR__TRANSPORT:
1940 rd_kafka_wrlock(rk);
1941 rd_kafka_txn_coord_set(rk, NULL,
1942 "EndTxn failed: %s",
1943 rd_kafka_err2str(err));
1944 rd_kafka_wrunlock(rk);
1945 actions |= RD_KAFKA_ERR_ACTION_RETRY;
1946 break;
1947
1948 case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH:
1949 case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
1950 case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE:
1951 actions |= RD_KAFKA_ERR_ACTION_FATAL;
1952 break;
1953
1954 default:
1955 /* All unhandled errors are permanent */
1956 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1957 }
1958
1959
1960 if (actions & RD_KAFKA_ERR_ACTION_FATAL) {
1961 rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err,
1962 "Failed to end transaction: %s",
1963 rd_kafka_err2str(err));
1964
1965 } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
1966 if (rd_kafka_buf_retry(rkb, request))
1967 return;
1968 actions |= RD_KAFKA_ERR_ACTION_PERMANENT;
1969 }
1970
1971 if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
1972 rd_kafka_txn_set_abortable_error(rk, err,
1973 "Failed to end transaction: "
1974 "%s",
1975 rd_kafka_err2str(err));
1976
1977 if (err)
1978 rd_kafka_txn_curr_api_reply(
1979 rkq, err,
1980 "EndTxn %s failed: %s", is_commit ? "commit" : "abort",
1981 rd_kafka_err2str(err));
1982 else
1983 rd_kafka_txn_curr_api_reply(rkq, RD_KAFKA_RESP_ERR_NO_ERROR,
1984 NULL);
1985 }
1986
1987
1988
1989 /**
1990 * @brief Handler for commit_transaction()
1991 *
1992 * @locks none
1993 * @locality rdkafka main thread
1994 */
1995 static rd_kafka_op_res_t
rd_kafka_txn_op_commit_transaction(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)1996 rd_kafka_txn_op_commit_transaction (rd_kafka_t *rk,
1997 rd_kafka_q_t *rkq,
1998 rd_kafka_op_t *rko) {
1999 rd_kafka_error_t *error;
2000 rd_kafka_resp_err_t err;
2001 char errstr[512];
2002 rd_kafka_pid_t pid;
2003
2004 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
2005 return RD_KAFKA_OP_RES_HANDLED;
2006
2007 rd_kafka_wrlock(rk);
2008
2009 if ((error = rd_kafka_txn_require_state(
2010 rk, RD_KAFKA_TXN_STATE_BEGIN_COMMIT)))
2011 goto err;
2012
2013 pid = rd_kafka_idemp_get_pid0(rk, rd_false/*dont-lock*/);
2014 if (!rd_kafka_pid_valid(pid)) {
2015 rd_dassert(!*"BUG: No PID despite proper transaction state");
2016 error = rd_kafka_error_new_retriable(
2017 RD_KAFKA_RESP_ERR__STATE,
2018 "No PID available (idempotence state %s)",
2019 rd_kafka_idemp_state2str(rk->rk_eos.idemp_state));
2020 goto err;
2021 }
2022
2023 err = rd_kafka_EndTxnRequest(rk->rk_eos.txn_coord,
2024 rk->rk_conf.eos.transactional_id,
2025 pid,
2026 rd_true /* commit */,
2027 errstr, sizeof(errstr),
2028 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
2029 rd_kafka_txn_handle_EndTxn,
2030 rd_kafka_q_keep(rko->rko_replyq.q));
2031 if (err) {
2032 error = rd_kafka_error_new_retriable(err, "%s", errstr);
2033 goto err;
2034 }
2035
2036 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION);
2037
2038 rd_kafka_wrunlock(rk);
2039
2040 return RD_KAFKA_OP_RES_HANDLED;
2041
2042 err:
2043 rd_kafka_wrunlock(rk);
2044
2045 rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
2046 error);
2047
2048 return RD_KAFKA_OP_RES_HANDLED;
2049 }
2050
2051
2052 /**
2053 * @brief Handler for commit_transaction()'s first phase: begin commit
2054 *
2055 * @locks none
2056 * @locality rdkafka main thread
2057 */
2058 static rd_kafka_op_res_t
rd_kafka_txn_op_begin_commit(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)2059 rd_kafka_txn_op_begin_commit (rd_kafka_t *rk,
2060 rd_kafka_q_t *rkq,
2061 rd_kafka_op_t *rko) {
2062 rd_kafka_error_t *error;
2063
2064 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
2065 return RD_KAFKA_OP_RES_HANDLED;
2066
2067
2068 if ((error = rd_kafka_txn_require_state(
2069 rk,
2070 RD_KAFKA_TXN_STATE_IN_TRANSACTION,
2071 RD_KAFKA_TXN_STATE_BEGIN_COMMIT)))
2072 goto done;
2073
2074 rd_kafka_wrlock(rk);
2075 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_BEGIN_COMMIT);
2076 rd_kafka_wrunlock(rk);
2077
2078 /* FALLTHRU */
2079 done:
2080 rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
2081 error);
2082
2083 return RD_KAFKA_OP_RES_HANDLED;
2084 }
2085
2086
2087 rd_kafka_error_t *
rd_kafka_commit_transaction(rd_kafka_t * rk,int timeout_ms)2088 rd_kafka_commit_transaction (rd_kafka_t *rk, int timeout_ms) {
2089 rd_kafka_error_t *error;
2090 rd_kafka_resp_err_t err;
2091 rd_ts_t abs_timeout;
2092
2093 if ((error = rd_kafka_ensure_transactional(rk)))
2094 return error;
2095
2096 /* The commit is in two phases:
2097 * - begin commit: wait for outstanding messages to be produced,
2098 * disallow new messages from being produced
2099 * by application.
2100 * - commit: commit transaction.
2101 */
2102
2103 abs_timeout = rd_timeout_init(timeout_ms);
2104
2105 /* Begin commit */
2106 error = rd_kafka_txn_curr_api_req(
2107 rk, "commit_transaction (begin)",
2108 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
2109 rd_kafka_txn_op_begin_commit),
2110 rd_timeout_remains(abs_timeout),
2111 RD_KAFKA_TXN_CURR_API_F_FOR_REUSE|
2112 RD_KAFKA_TXN_CURR_API_F_ABORT_ON_TIMEOUT);
2113 if (error)
2114 return error;
2115
2116 rd_kafka_dbg(rk, EOS, "TXNCOMMIT",
2117 "Flushing %d outstanding message(s) prior to commit",
2118 rd_kafka_outq_len(rk));
2119
2120 /* Wait for queued messages to be delivered, limited by
2121 * the remaining transaction lifetime. */
2122 if ((err = rd_kafka_flush(rk, rd_timeout_remains(abs_timeout)))) {
2123 if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
2124 error = rd_kafka_error_new_retriable(
2125 err,
2126 "Failed to flush all outstanding messages "
2127 "within the transaction timeout: "
2128 "%d message(s) remaining%s",
2129 rd_kafka_outq_len(rk),
2130 (rk->rk_conf.enabled_events &
2131 RD_KAFKA_EVENT_DR) ?
2132 ": the event queue must be polled "
2133 "for delivery report events in a separate "
2134 "thread or prior to calling commit" : "");
2135 else
2136 error = rd_kafka_error_new_retriable(
2137 err,
2138 "Failed to flush outstanding messages: %s",
2139 rd_kafka_err2str(err));
2140
2141 rd_kafka_txn_curr_api_reset(rk);
2142
2143 /* FIXME: What to do here? Add test case */
2144
2145 return error;
2146 }
2147
2148
2149 /* Commit transaction */
2150 return rd_kafka_txn_curr_api_req(
2151 rk, "commit_transaction",
2152 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
2153 rd_kafka_txn_op_commit_transaction),
2154 rd_timeout_remains(abs_timeout),
2155 RD_KAFKA_TXN_CURR_API_F_REUSE|
2156 RD_KAFKA_TXN_CURR_API_F_ABORT_ON_TIMEOUT);
2157 }
2158
2159
2160
2161 /**
2162 * @brief Handler for abort_transaction()'s first phase: begin abort
2163 *
2164 * @locks none
2165 * @locality rdkafka main thread
2166 */
2167 static rd_kafka_op_res_t
rd_kafka_txn_op_begin_abort(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)2168 rd_kafka_txn_op_begin_abort (rd_kafka_t *rk,
2169 rd_kafka_q_t *rkq,
2170 rd_kafka_op_t *rko) {
2171 rd_kafka_error_t *error;
2172
2173 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
2174 return RD_KAFKA_OP_RES_HANDLED;
2175
2176 if ((error = rd_kafka_txn_require_state(
2177 rk,
2178 RD_KAFKA_TXN_STATE_IN_TRANSACTION,
2179 RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION,
2180 RD_KAFKA_TXN_STATE_ABORTABLE_ERROR)))
2181 goto done;
2182
2183 rd_kafka_wrlock(rk);
2184 rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION);
2185 rd_kafka_wrunlock(rk);
2186
2187 mtx_lock(&rk->rk_eos.txn_pending_lock);
2188 rd_kafka_txn_clear_pending_partitions(rk);
2189 mtx_unlock(&rk->rk_eos.txn_pending_lock);
2190
2191
2192 /* FALLTHRU */
2193 done:
2194 rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
2195 error);
2196
2197 return RD_KAFKA_OP_RES_HANDLED;
2198 }
2199
2200
2201 /**
2202 * @brief Handler for abort_transaction()
2203 *
2204 * @locks none
2205 * @locality rdkafka main thread
2206 */
2207 static rd_kafka_op_res_t
rd_kafka_txn_op_abort_transaction(rd_kafka_t * rk,rd_kafka_q_t * rkq,rd_kafka_op_t * rko)2208 rd_kafka_txn_op_abort_transaction (rd_kafka_t *rk,
2209 rd_kafka_q_t *rkq,
2210 rd_kafka_op_t *rko) {
2211 rd_kafka_error_t *error;
2212 rd_kafka_resp_err_t err;
2213 char errstr[512];
2214 rd_kafka_pid_t pid;
2215
2216 if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
2217 return RD_KAFKA_OP_RES_HANDLED;
2218
2219 rd_kafka_wrlock(rk);
2220
2221 if ((error = rd_kafka_txn_require_state(
2222 rk, RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION)))
2223 goto err;
2224
2225 pid = rd_kafka_idemp_get_pid0(rk, rd_false/*dont-lock*/);
2226 if (!rd_kafka_pid_valid(pid)) {
2227 rd_dassert(!*"BUG: No PID despite proper transaction state");
2228 error = rd_kafka_error_new_retriable(
2229 RD_KAFKA_RESP_ERR__STATE,
2230 "No PID available (idempotence state %s)",
2231 rd_kafka_idemp_state2str(rk->rk_eos.idemp_state));
2232 goto err;
2233 }
2234
2235 if (!rk->rk_eos.txn_req_cnt) {
2236 rd_kafka_dbg(rk, EOS, "TXNABORT",
2237 "No partitions registered: not sending EndTxn");
2238 rd_kafka_txn_complete(rk);
2239 goto err;
2240 }
2241
2242 err = rd_kafka_EndTxnRequest(rk->rk_eos.txn_coord,
2243 rk->rk_conf.eos.transactional_id,
2244 pid,
2245 rd_false /* abort */,
2246 errstr, sizeof(errstr),
2247 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
2248 rd_kafka_txn_handle_EndTxn,
2249 rd_kafka_q_keep(rko->rko_replyq.q));
2250 if (err) {
2251 error = rd_kafka_error_new_retriable(err, "%s", errstr);
2252 goto err;
2253 }
2254
2255 rd_kafka_wrunlock(rk);
2256
2257 return RD_KAFKA_OP_RES_HANDLED;
2258
2259 err:
2260 rd_kafka_wrunlock(rk);
2261
2262 rd_kafka_txn_curr_api_reply_error(rd_kafka_q_keep(rko->rko_replyq.q),
2263 error);
2264
2265 // FIXME: What state do we transition to? READY? FATAL?
2266
2267 return RD_KAFKA_OP_RES_HANDLED;
2268 }
2269
2270
2271 rd_kafka_error_t *
rd_kafka_abort_transaction(rd_kafka_t * rk,int timeout_ms)2272 rd_kafka_abort_transaction (rd_kafka_t *rk, int timeout_ms) {
2273 rd_kafka_error_t *error;
2274 rd_kafka_resp_err_t err;
2275 rd_ts_t abs_timeout = rd_timeout_init(timeout_ms);
2276
2277 if ((error = rd_kafka_ensure_transactional(rk)))
2278 return error;
2279
2280 /* The abort is multi-phase:
2281 * - set state to ABORTING_TRANSACTION
2282 * - flush() outstanding messages
2283 * - send EndTxn
2284 *
2285 * The curr_api must be reused during all these steps to avoid
2286 * a race condition where another application thread calls a
2287 * txn API inbetween the steps.
2288 */
2289
2290 error = rd_kafka_txn_curr_api_req(
2291 rk, "abort_transaction (begin)",
2292 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
2293 rd_kafka_txn_op_begin_abort),
2294 RD_POLL_INFINITE, /* begin_abort is immediate, no timeout */
2295 RD_KAFKA_TXN_CURR_API_F_FOR_REUSE|
2296 RD_KAFKA_TXN_CURR_API_F_RETRIABLE_ON_TIMEOUT);
2297 if (error)
2298 return error;
2299
2300 rd_kafka_dbg(rk, EOS, "TXNABORT",
2301 "Purging and flushing %d outstanding message(s) prior "
2302 "to abort",
2303 rd_kafka_outq_len(rk));
2304
2305 /* Purge all queued messages.
2306 * Will need to wait for messages in-flight since purging these
2307 * messages may lead to gaps in the idempotent producer sequences. */
2308 err = rd_kafka_purge(rk,
2309 RD_KAFKA_PURGE_F_QUEUE|
2310 RD_KAFKA_PURGE_F_ABORT_TXN);
2311
2312 /* Serve delivery reports for the purged messages. */
2313 if ((err = rd_kafka_flush(rk, rd_timeout_remains(abs_timeout)))) {
2314 /* FIXME: Not sure these errors matter that much */
2315 if (err == RD_KAFKA_RESP_ERR__TIMED_OUT)
2316 error = rd_kafka_error_new_retriable(
2317 err,
2318 "Failed to flush all outstanding messages "
2319 "within the transaction timeout: "
2320 "%d message(s) remaining%s",
2321 rd_kafka_outq_len(rk),
2322 (rk->rk_conf.enabled_events &
2323 RD_KAFKA_EVENT_DR) ?
2324 ": the event queue must be polled "
2325 "for delivery report events in a separate "
2326 "thread or prior to calling abort" : "");
2327
2328 else
2329 error = rd_kafka_error_new_retriable(
2330 err,
2331 "Failed to flush outstanding messages: %s",
2332 rd_kafka_err2str(err));
2333
2334 rd_kafka_txn_curr_api_reset(rk);
2335
2336 /* FIXME: What to do here? */
2337
2338 return error;
2339 }
2340
2341
2342 return rd_kafka_txn_curr_api_req(
2343 rk, "abort_transaction",
2344 rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN,
2345 rd_kafka_txn_op_abort_transaction),
2346 0,
2347 RD_KAFKA_TXN_CURR_API_F_REUSE);
2348 }
2349
2350
2351
2352 /**
2353 * @brief Coordinator query timer
2354 *
2355 * @locality rdkafka main thread
2356 * @locks none
2357 */
2358
rd_kafka_txn_coord_timer_cb(rd_kafka_timers_t * rkts,void * arg)2359 static void rd_kafka_txn_coord_timer_cb (rd_kafka_timers_t *rkts, void *arg) {
2360 rd_kafka_t *rk = arg;
2361
2362 rd_kafka_wrlock(rk);
2363 rd_kafka_txn_coord_query(rk, "Coordinator query timer");
2364 rd_kafka_wrunlock(rk);
2365 }
2366
2367 /**
2368 * @brief (Re-)Start coord query timer
2369 *
2370 * @locality rdkafka main thread
2371 * @locks none
2372 */
rd_kafka_txn_coord_timer_restart(rd_kafka_t * rk,int timeout_ms)2373 static void rd_kafka_txn_coord_timer_restart (rd_kafka_t *rk, int timeout_ms) {
2374 rd_assert(rd_kafka_is_transactional(rk));
2375 rd_kafka_timer_start_oneshot(&rk->rk_timers,
2376 &rk->rk_eos.txn_coord_tmr, rd_true,
2377 1000 * timeout_ms,
2378 rd_kafka_txn_coord_timer_cb, rk);
2379 }
2380
2381
2382 /**
2383 * @brief Parses and handles a FindCoordinator response.
2384 *
2385 * @locality rdkafka main thread
2386 * @locks none
2387 */
2388 static void
rd_kafka_txn_handle_FindCoordinator(rd_kafka_t * rk,rd_kafka_broker_t * rkb,rd_kafka_resp_err_t err,rd_kafka_buf_t * rkbuf,rd_kafka_buf_t * request,void * opaque)2389 rd_kafka_txn_handle_FindCoordinator (rd_kafka_t *rk,
2390 rd_kafka_broker_t *rkb,
2391 rd_kafka_resp_err_t err,
2392 rd_kafka_buf_t *rkbuf,
2393 rd_kafka_buf_t *request,
2394 void *opaque) {
2395 const int log_decode_errors = LOG_ERR;
2396 int16_t ErrorCode;
2397 rd_kafkap_str_t Host;
2398 int32_t NodeId, Port;
2399 char errstr[512];
2400
2401 *errstr = '\0';
2402
2403 rk->rk_eos.txn_wait_coord = rd_false;
2404
2405 if (err)
2406 goto err;
2407
2408 if (request->rkbuf_reqhdr.ApiVersion >= 1)
2409 rd_kafka_buf_read_throttle_time(rkbuf);
2410
2411 rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
2412
2413 if (request->rkbuf_reqhdr.ApiVersion >= 1) {
2414 rd_kafkap_str_t ErrorMsg;
2415 rd_kafka_buf_read_str(rkbuf, &ErrorMsg);
2416 if (ErrorCode)
2417 rd_snprintf(errstr, sizeof(errstr),
2418 "%.*s", RD_KAFKAP_STR_PR(&ErrorMsg));
2419 }
2420
2421 if ((err = ErrorCode))
2422 goto err;
2423
2424 rd_kafka_buf_read_i32(rkbuf, &NodeId);
2425 rd_kafka_buf_read_str(rkbuf, &Host);
2426 rd_kafka_buf_read_i32(rkbuf, &Port);
2427
2428 rd_rkb_dbg(rkb, EOS, "TXNCOORD",
2429 "FindCoordinator response: "
2430 "Transaction coordinator is broker %"PRId32" (%.*s:%d)",
2431 NodeId, RD_KAFKAP_STR_PR(&Host), (int)Port);
2432
2433 rd_kafka_rdlock(rk);
2434 if (NodeId == -1)
2435 err = RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE;
2436 else if (!(rkb = rd_kafka_broker_find_by_nodeid(rk, NodeId))) {
2437 rd_snprintf(errstr, sizeof(errstr),
2438 "Transaction coordinator %"PRId32" is unknown",
2439 NodeId);
2440 err = RD_KAFKA_RESP_ERR__UNKNOWN_BROKER;
2441 }
2442 rd_kafka_rdunlock(rk);
2443
2444 if (err)
2445 goto err;
2446
2447 rd_kafka_wrlock(rk);
2448 rd_kafka_txn_coord_set(rk, rkb, "FindCoordinator response");
2449 rd_kafka_wrunlock(rk);
2450
2451 rd_kafka_broker_destroy(rkb);
2452
2453 return;
2454
2455 err_parse:
2456 err = rkbuf->rkbuf_err;
2457 err:
2458
2459 switch (err)
2460 {
2461 case RD_KAFKA_RESP_ERR__DESTROY:
2462 return;
2463
2464 case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
2465 rd_kafka_wrlock(rk);
2466 rd_kafka_txn_set_fatal_error(
2467 rkb->rkb_rk, RD_DONT_LOCK, err,
2468 "Failed to find transaction coordinator: %s: %s%s%s",
2469 rd_kafka_broker_name(rkb),
2470 rd_kafka_err2str(err),
2471 *errstr ? ": " : "", errstr);
2472
2473 rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_FATAL_ERROR);
2474 rd_kafka_wrunlock(rk);
2475 return;
2476
2477 case RD_KAFKA_RESP_ERR__UNKNOWN_BROKER:
2478 rd_kafka_metadata_refresh_brokers(rk, NULL, errstr);
2479 break;
2480
2481 default:
2482 break;
2483 }
2484
2485 rd_kafka_wrlock(rk);
2486 rd_kafka_txn_coord_set(rk, NULL,
2487 "Failed to find transaction coordinator: %s: %s",
2488 rd_kafka_err2name(err),
2489 *errstr ? errstr : rd_kafka_err2str(err));
2490 rd_kafka_wrunlock(rk);
2491 }
2492
2493
2494
2495
2496 /**
2497 * @brief Query for the transaction coordinator.
2498 *
2499 * @returns true if a fatal error was raised, else false.
2500 *
2501 * @locality rdkafka main thread
2502 * @locks rd_kafka_wrlock(rk) MUST be held.
2503 */
rd_kafka_txn_coord_query(rd_kafka_t * rk,const char * reason)2504 rd_bool_t rd_kafka_txn_coord_query (rd_kafka_t *rk, const char *reason) {
2505 rd_kafka_resp_err_t err;
2506 char errstr[512];
2507 rd_kafka_broker_t *rkb;
2508
2509 rd_assert(rd_kafka_is_transactional(rk));
2510
2511 if (rk->rk_eos.txn_wait_coord) {
2512 rd_kafka_dbg(rk, EOS, "TXNCOORD",
2513 "Not sending coordinator query (%s): "
2514 "waiting for previous query to finish",
2515 reason);
2516 return rd_false;
2517 }
2518
2519 /* Find usable broker to query for the txn coordinator */
2520 rkb = rd_kafka_idemp_broker_any(rk, &err,
2521 errstr, sizeof(errstr));
2522 if (!rkb) {
2523 rd_kafka_dbg(rk, EOS, "TXNCOORD",
2524 "Unable to query for transaction coordinator: %s",
2525 errstr);
2526
2527 if (rd_kafka_idemp_check_error(rk, err, errstr))
2528 return rd_true;
2529
2530 rd_kafka_txn_coord_timer_restart(rk, 500);
2531
2532 return rd_false;
2533 }
2534
2535 /* Send FindCoordinator request */
2536 err = rd_kafka_FindCoordinatorRequest(
2537 rkb, RD_KAFKA_COORD_TXN,
2538 rk->rk_conf.eos.transactional_id,
2539 RD_KAFKA_REPLYQ(rk->rk_ops, 0),
2540 rd_kafka_txn_handle_FindCoordinator, NULL);
2541
2542 if (err) {
2543 rd_snprintf(errstr, sizeof(errstr),
2544 "Failed to send coordinator query to %s: "
2545 "%s",
2546 rd_kafka_broker_name(rkb),
2547 rd_kafka_err2str(err));
2548
2549 rd_kafka_broker_destroy(rkb);
2550
2551 if (rd_kafka_idemp_check_error(rk, err, errstr))
2552 return rd_true; /* Fatal error */
2553
2554 rd_kafka_txn_coord_timer_restart(rk, 500);
2555
2556 return rd_false;
2557 }
2558
2559 rd_kafka_broker_destroy(rkb);
2560
2561 rk->rk_eos.txn_wait_coord = rd_true;
2562
2563 return rd_false;
2564 }
2565
2566 /**
2567 * @brief Sets or clears the current coordinator address.
2568 *
2569 * @returns true if the coordinator was changed, else false.
2570 *
2571 * @locality rd_kafka_main_thread
2572 * @locks rd_kafka_wrlock(rk) MUST be held
2573 */
rd_kafka_txn_coord_set(rd_kafka_t * rk,rd_kafka_broker_t * rkb,const char * fmt,...)2574 rd_bool_t rd_kafka_txn_coord_set (rd_kafka_t *rk, rd_kafka_broker_t *rkb,
2575 const char *fmt, ...) {
2576 char buf[256];
2577 va_list ap;
2578
2579 va_start(ap, fmt);
2580 vsnprintf(buf, sizeof(buf), fmt, ap);
2581 va_end(ap);
2582
2583
2584 if (rk->rk_eos.txn_curr_coord == rkb) {
2585 if (!rkb) {
2586 rd_kafka_dbg(rk, EOS, "TXNCOORD", "%s", buf);
2587 /* Keep querying for the coordinator */
2588 rd_kafka_txn_coord_timer_restart(rk, 500);
2589 }
2590 return rd_false;
2591 }
2592
2593 rd_kafka_dbg(rk, EOS, "TXNCOORD",
2594 "Transaction coordinator changed from %s -> %s: %s",
2595 rk->rk_eos.txn_curr_coord ?
2596 rd_kafka_broker_name(rk->rk_eos.txn_curr_coord) :
2597 "(none)",
2598 rkb ? rd_kafka_broker_name(rkb) : "(none)",
2599 buf);
2600
2601 if (rk->rk_eos.txn_curr_coord)
2602 rd_kafka_broker_destroy(rk->rk_eos.txn_curr_coord);
2603
2604 rk->rk_eos.txn_curr_coord = rkb;
2605 if (rkb)
2606 rd_kafka_broker_keep(rkb);
2607
2608 rd_kafka_broker_set_nodename(rk->rk_eos.txn_coord,
2609 rk->rk_eos.txn_curr_coord);
2610
2611 if (!rkb) {
2612 /* Lost the current coordinator, query for new coordinator */
2613 rd_kafka_txn_coord_timer_restart(rk, 500);
2614 } else {
2615 /* Trigger PID state machine */
2616 rd_kafka_idemp_pid_fsm(rk);
2617 }
2618
2619 return rd_true;
2620 }
2621
2622
2623 /**
2624 * @brief Coordinator state monitor callback.
2625 *
2626 * @locality rdkafka main thread
2627 * @locks none
2628 */
rd_kafka_txn_coord_monitor_cb(rd_kafka_broker_t * rkb)2629 void rd_kafka_txn_coord_monitor_cb (rd_kafka_broker_t *rkb) {
2630 rd_kafka_t *rk = rkb->rkb_rk;
2631 rd_kafka_broker_state_t state = rd_kafka_broker_get_state(rkb);
2632 rd_bool_t is_up;
2633
2634 rd_assert(rk->rk_eos.txn_coord == rkb);
2635
2636 is_up = rd_kafka_broker_state_is_up(state);
2637 rd_rkb_dbg(rkb, EOS, "COORD",
2638 "Transaction coordinator is now %s",
2639 is_up ? "up" : "down");
2640
2641 if (!is_up) {
2642 /* Coordinator is down, the connection will be re-established
2643 * automatically, but we also trigger a coordinator query
2644 * to pick up on coordinator change. */
2645 rd_kafka_txn_coord_timer_restart(rk, 500);
2646
2647 } else {
2648 /* Coordinator is up. */
2649
2650 rd_kafka_wrlock(rk);
2651 if (rk->rk_eos.idemp_state < RD_KAFKA_IDEMP_STATE_ASSIGNED) {
2652 /* See if a idempotence state change is warranted. */
2653 rd_kafka_idemp_pid_fsm(rk);
2654
2655 } else if (rk->rk_eos.idemp_state ==
2656 RD_KAFKA_IDEMP_STATE_ASSIGNED) {
2657 /* PID is already valid, continue transactional
2658 * operations by checking for partitions to register */
2659 rd_kafka_txn_schedule_register_partitions(rk,
2660 1/*ASAP*/);
2661 }
2662
2663 rd_kafka_wrunlock(rk);
2664 }
2665 }
2666
2667
2668
2669 /**
2670 * @brief Transactions manager destructor
2671 *
2672 * @locality rdkafka main thread
2673 * @locks none
2674 */
rd_kafka_txns_term(rd_kafka_t * rk)2675 void rd_kafka_txns_term (rd_kafka_t *rk) {
2676 RD_IF_FREE(rk->rk_eos.txn_init_rkq, rd_kafka_q_destroy);
2677
2678 RD_IF_FREE(rk->rk_eos.txn_errstr, rd_free);
2679
2680 rd_kafka_timer_stop(&rk->rk_timers,
2681 &rk->rk_eos.txn_coord_tmr, 1);
2682 rd_kafka_timer_stop(&rk->rk_timers,
2683 &rk->rk_eos.txn_register_parts_tmr, 1);
2684
2685 if (rk->rk_eos.txn_curr_coord)
2686 rd_kafka_broker_destroy(rk->rk_eos.txn_curr_coord);
2687
2688 /* Logical coordinator */
2689 rd_kafka_broker_persistent_connection_del(
2690 rk->rk_eos.txn_coord,
2691 &rk->rk_eos.txn_coord->rkb_persistconn.coord);
2692 rd_kafka_broker_monitor_del(&rk->rk_eos.txn_coord_mon);
2693 rd_kafka_broker_destroy(rk->rk_eos.txn_coord);
2694 rk->rk_eos.txn_coord = NULL;
2695
2696 mtx_lock(&rk->rk_eos.txn_pending_lock);
2697 rd_kafka_txn_clear_pending_partitions(rk);
2698 mtx_unlock(&rk->rk_eos.txn_pending_lock);
2699 mtx_destroy(&rk->rk_eos.txn_pending_lock);
2700
2701 rd_kafka_txn_clear_partitions(rk);
2702 }
2703
2704
2705 /**
2706 * @brief Initialize transactions manager.
2707 *
2708 * @locality application thread
2709 * @locks none
2710 */
rd_kafka_txns_init(rd_kafka_t * rk)2711 void rd_kafka_txns_init (rd_kafka_t *rk) {
2712 rd_atomic32_init(&rk->rk_eos.txn_may_enq, 0);
2713 mtx_init(&rk->rk_eos.txn_pending_lock, mtx_plain);
2714 TAILQ_INIT(&rk->rk_eos.txn_pending_rktps);
2715 TAILQ_INIT(&rk->rk_eos.txn_waitresp_rktps);
2716 TAILQ_INIT(&rk->rk_eos.txn_rktps);
2717
2718 /* Logical coordinator */
2719 rk->rk_eos.txn_coord =
2720 rd_kafka_broker_add_logical(rk, "TxnCoordinator");
2721
2722 rd_kafka_broker_monitor_add(&rk->rk_eos.txn_coord_mon,
2723 rk->rk_eos.txn_coord,
2724 rk->rk_ops,
2725 rd_kafka_txn_coord_monitor_cb);
2726
2727 rd_kafka_broker_persistent_connection_add(
2728 rk->rk_eos.txn_coord,
2729 &rk->rk_eos.txn_coord->rkb_persistconn.coord);
2730 }
2731
2732