1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2018 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 
30 #ifndef _RD_KAFKA_IDEMPOTENCE_H_
31 #define _RD_KAFKA_IDEMPOTENCE_H_
32 
33 
34 /**
35  * @define The broker maintains a window of the 5 last Produce requests
36  *         for a partition to be able to de-deduplicate resends.
37  */
38 #define RD_KAFKA_IDEMP_MAX_INFLIGHT      5
39 #define RD_KAFKA_IDEMP_MAX_INFLIGHT_STR "5" /* For printouts */
40 
41 /**
42  * @brief Get the current PID if state permits.
43  *
44  * @returns If there is no valid PID or the state
45  *          does not permit further PID usage (such as when draining)
46  *          then an invalid PID is returned.
47  *
48  * @locality any
49  * @locks none
50  */
51 static RD_UNUSED RD_INLINE rd_kafka_pid_t
rd_kafka_idemp_get_pid0(rd_kafka_t * rk,rd_bool_t do_lock)52 rd_kafka_idemp_get_pid0 (rd_kafka_t *rk, rd_bool_t do_lock) {
53         rd_kafka_pid_t pid;
54 
55         if (do_lock)
56                 rd_kafka_rdlock(rk);
57         if (likely(rk->rk_eos.idemp_state == RD_KAFKA_IDEMP_STATE_ASSIGNED))
58                 pid = rk->rk_eos.pid;
59         else
60                 rd_kafka_pid_reset(&pid);
61         if (do_lock)
62                 rd_kafka_rdunlock(rk);
63 
64         return pid;
65 }
66 
67 #define rd_kafka_idemp_get_pid(rk) rd_kafka_idemp_get_pid0(rk,rd_true/*lock*/)
68 
69 void rd_kafka_idemp_set_state (rd_kafka_t *rk,
70                                rd_kafka_idemp_state_t new_state);
71 void rd_kafka_idemp_request_pid_failed (rd_kafka_broker_t *rkb,
72                                         rd_kafka_resp_err_t err);
73 void rd_kafka_idemp_pid_update (rd_kafka_broker_t *rkb,
74                                 const rd_kafka_pid_t pid);
75 void rd_kafka_idemp_pid_fsm (rd_kafka_t *rk);
76 void rd_kafka_idemp_drain_reset (rd_kafka_t *rk, const char *reason);
77 void rd_kafka_idemp_drain_epoch_bump (rd_kafka_t *rk, rd_kafka_resp_err_t err,
78                                       const char *fmt, ...)
79         RD_FORMAT(printf, 3, 4);
80 void rd_kafka_idemp_drain_toppar (rd_kafka_toppar_t *rktp, const char *reason);
81 void rd_kafka_idemp_inflight_toppar_sub (rd_kafka_t *rk,
82                                          rd_kafka_toppar_t *rktp);
83 void rd_kafka_idemp_inflight_toppar_add (rd_kafka_t *rk,
84                                          rd_kafka_toppar_t *rktp);
85 
86 rd_kafka_broker_t *
87 rd_kafka_idemp_broker_any (rd_kafka_t *rk,
88                            rd_kafka_resp_err_t *errp,
89                            char *errstr, size_t errstr_size);
90 
91 rd_bool_t rd_kafka_idemp_check_error (rd_kafka_t *rk,
92                                       rd_kafka_resp_err_t err,
93                                       const char *errstr,
94                                       rd_bool_t is_fatal);
95 
96 
97 /**
98  * @brief Call when a fatal idempotence error has occurred, when the producer
99  *        can't continue without risking the idempotency guarantees.
100  *
101  * If the producer is transactional this error is non-fatal and will just
102  * cause the current transaction to transition into the ABORTABLE_ERROR state.
103  * If the producer is not transactional the client instance fatal error
104  * is set and the producer instance is no longer usable.
105  *
106  * @Warning Until KIP-360 has been fully implemented any fatal idempotent
107  *          producer error will also raise a fatal transactional producer error.
108  *          This is to guarantee that there is no silent data loss.
109  *
110  * @param RK rd_kafka_t instance
111  * @param ERR error to raise
112  * @param ... format string with error message
113  *
114  * @locality any thread
115  * @locks none
116  */
117 #define rd_kafka_idemp_set_fatal_error(RK,ERR,...) do {                 \
118                 if (rd_kafka_is_transactional(RK))                      \
119                         rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, ERR, \
120                                                      __VA_ARGS__);      \
121                 else                                                    \
122                         rd_kafka_set_fatal_error(RK, ERR, __VA_ARGS__); \
123         } while (0)
124 
125 void rd_kafka_idemp_start (rd_kafka_t *rk, rd_bool_t immediate);
126 void rd_kafka_idemp_init (rd_kafka_t *rk);
127 void rd_kafka_idemp_term (rd_kafka_t *rk);
128 
129 
130 #endif /* _RD_KAFKA_IDEMPOTENCE_H_ */
131