1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 #ifndef _RDKAFKA_REQUEST_H_
29 #define _RDKAFKA_REQUEST_H_
30 
31 #include "rdkafka_cgrp.h"
32 #include "rdkafka_feature.h"
33 
34 
35 #define RD_KAFKA_ERR_ACTION_PERMANENT  0x1 /* Permanent error */
36 #define RD_KAFKA_ERR_ACTION_IGNORE     0x2 /* Error can be ignored */
37 #define RD_KAFKA_ERR_ACTION_REFRESH    0x4 /* Refresh state (e.g., metadata) */
38 #define RD_KAFKA_ERR_ACTION_RETRY      0x8 /* Retry request after backoff */
39 #define RD_KAFKA_ERR_ACTION_INFORM    0x10 /* Inform application about err */
40 #define RD_KAFKA_ERR_ACTION_SPECIAL   0x20 /* Special-purpose, depends on context */
41 #define RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED 0x40 /* ProduceReq msg status */
42 #define RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED 0x80 /* ProduceReq msg status */
43 #define RD_KAFKA_ERR_ACTION_MSG_PERSISTED 0x100    /* ProduceReq msg status */
44 #define RD_KAFKA_ERR_ACTION_FATAL    0x200 /**< Fatal error */
45 #define RD_KAFKA_ERR_ACTION_END          0 /* var-arg sentinel */
46 
47 /** @macro bitmask of the message persistence flags */
48 #define RD_KAFKA_ERR_ACTION_MSG_FLAGS                   \
49         (RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED |        \
50          RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED |   \
51          RD_KAFKA_ERR_ACTION_MSG_PERSISTED)
52 
53 int rd_kafka_err_action (rd_kafka_broker_t *rkb,
54 			 rd_kafka_resp_err_t err,
55 			 const rd_kafka_buf_t *request, ...);
56 
57 
58 rd_kafka_topic_partition_list_t *
59 rd_kafka_buf_read_topic_partitions (rd_kafka_buf_t *rkbuf,
60                                     size_t estimated_part_cnt);
61 int rd_kafka_buf_write_topic_partitions (
62         rd_kafka_buf_t *rkbuf,
63         const rd_kafka_topic_partition_list_t *parts,
64         rd_bool_t skip_invalid_offsets,
65         rd_bool_t write_Epoch,
66         rd_bool_t write_Metadata);
67 
68 rd_kafka_resp_err_t
69 rd_kafka_FindCoordinatorRequest (rd_kafka_broker_t *rkb,
70                                  rd_kafka_coordtype_t coordtype,
71                                  const char *coordkey,
72                                  rd_kafka_replyq_t replyq,
73                                  rd_kafka_resp_cb_t *resp_cb,
74                                  void *opaque);
75 
76 rd_kafka_resp_err_t rd_kafka_handle_Offset (rd_kafka_t *rk,
77 					    rd_kafka_broker_t *rkb,
78 					    rd_kafka_resp_err_t err,
79 					    rd_kafka_buf_t *rkbuf,
80 					    rd_kafka_buf_t *request,
81                                             rd_kafka_topic_partition_list_t
82                                             *offsets);
83 
84 void rd_kafka_OffsetRequest (rd_kafka_broker_t *rkb,
85                              rd_kafka_topic_partition_list_t *offsets,
86                              int16_t api_version,
87                              rd_kafka_replyq_t replyq,
88                              rd_kafka_resp_cb_t *resp_cb,
89                              void *opaque);
90 
91 rd_kafka_resp_err_t
92 rd_kafka_handle_OffsetFetch (rd_kafka_t *rk,
93 			     rd_kafka_broker_t *rkb,
94 			     rd_kafka_resp_err_t err,
95 			     rd_kafka_buf_t *rkbuf,
96 			     rd_kafka_buf_t *request,
97 			     rd_kafka_topic_partition_list_t *offsets,
98 			     int update_toppar);
99 
100 void rd_kafka_op_handle_OffsetFetch (rd_kafka_t *rk,
101 				     rd_kafka_broker_t *rkb,
102                                      rd_kafka_resp_err_t err,
103                                      rd_kafka_buf_t *rkbuf,
104                                      rd_kafka_buf_t *request,
105                                      void *opaque);
106 
107 void rd_kafka_OffsetFetchRequest (rd_kafka_broker_t *rkb,
108                                   int16_t api_version,
109                                   rd_kafka_topic_partition_list_t *parts,
110                                   rd_kafka_replyq_t replyq,
111                                   rd_kafka_resp_cb_t *resp_cb,
112                                   void *opaque);
113 
114 
115 
116 rd_kafka_resp_err_t
117 rd_kafka_handle_OffsetCommit (rd_kafka_t *rk,
118 			      rd_kafka_broker_t *rkb,
119 			      rd_kafka_resp_err_t err,
120 			      rd_kafka_buf_t *rkbuf,
121 			      rd_kafka_buf_t *request,
122 			      rd_kafka_topic_partition_list_t *offsets);
123 int rd_kafka_OffsetCommitRequest (rd_kafka_broker_t *rkb,
124 				  rd_kafka_cgrp_t *rkcg,
125 				  rd_kafka_topic_partition_list_t *offsets,
126 				  rd_kafka_replyq_t replyq,
127 				  rd_kafka_resp_cb_t *resp_cb,
128 				  void *opaque, const char *reason);
129 
130 
131 
132 void rd_kafka_JoinGroupRequest (rd_kafka_broker_t *rkb,
133                                 const rd_kafkap_str_t *group_id,
134                                 const rd_kafkap_str_t *member_id,
135                                 const rd_kafkap_str_t *group_instance_id,
136                                 const rd_kafkap_str_t *protocol_type,
137 				const rd_list_t *topics,
138                                 rd_kafka_replyq_t replyq,
139                                 rd_kafka_resp_cb_t *resp_cb,
140                                 void *opaque);
141 
142 
143 void rd_kafka_LeaveGroupRequest (rd_kafka_broker_t *rkb,
144                                  const rd_kafkap_str_t *group_id,
145                                  const rd_kafkap_str_t *member_id,
146                                  const rd_kafkap_str_t *group_instance_id,
147                                  rd_kafka_replyq_t replyq,
148                                  rd_kafka_resp_cb_t *resp_cb,
149                                  void *opaque);
150 void rd_kafka_handle_LeaveGroup (rd_kafka_t *rk,
151 				 rd_kafka_broker_t *rkb,
152                                  rd_kafka_resp_err_t err,
153                                  rd_kafka_buf_t *rkbuf,
154                                  rd_kafka_buf_t *request,
155                                  void *opaque);
156 
157 void rd_kafka_SyncGroupRequest (rd_kafka_broker_t *rkb,
158                                 const rd_kafkap_str_t *group_id,
159                                 int32_t generation_id,
160                                 const rd_kafkap_str_t *member_id,
161                                 const rd_kafkap_str_t *group_instance_id,
162                                 const rd_kafka_group_member_t
163                                 *assignments,
164                                 int assignment_cnt,
165                                 rd_kafka_replyq_t replyq,
166                                 rd_kafka_resp_cb_t *resp_cb,
167                                 void *opaque);
168 void rd_kafka_handle_SyncGroup (rd_kafka_t *rk,
169 				rd_kafka_broker_t *rkb,
170                                 rd_kafka_resp_err_t err,
171                                 rd_kafka_buf_t *rkbuf,
172                                 rd_kafka_buf_t *request,
173                                 void *opaque);
174 
175 void rd_kafka_ListGroupsRequest (rd_kafka_broker_t *rkb,
176                                  rd_kafka_replyq_t replyq,
177                                  rd_kafka_resp_cb_t *resp_cb,
178                                  void *opaque);
179 
180 void rd_kafka_DescribeGroupsRequest (rd_kafka_broker_t *rkb,
181                                      const char **groups, int group_cnt,
182                                      rd_kafka_replyq_t replyq,
183                                      rd_kafka_resp_cb_t *resp_cb,
184                                      void *opaque);
185 
186 
187 void rd_kafka_HeartbeatRequest (rd_kafka_broker_t *rkb,
188                                 const rd_kafkap_str_t *group_id,
189                                 int32_t generation_id,
190                                 const rd_kafkap_str_t *member_id,
191                                 const rd_kafkap_str_t *group_instance_id,
192                                 rd_kafka_replyq_t replyq,
193                                 rd_kafka_resp_cb_t *resp_cb,
194                                 void *opaque);
195 
196 rd_kafka_resp_err_t
197 rd_kafka_MetadataRequest (rd_kafka_broker_t *rkb,
198                           const rd_list_t *topics, const char *reason,
199                           rd_kafka_op_t *rko);
200 
201 rd_kafka_resp_err_t
202 rd_kafka_handle_ApiVersion (rd_kafka_t *rk,
203 			    rd_kafka_broker_t *rkb,
204 			    rd_kafka_resp_err_t err,
205 			    rd_kafka_buf_t *rkbuf,
206 			    rd_kafka_buf_t *request,
207 			    struct rd_kafka_ApiVersion **apis,
208 			    size_t *api_cnt);
209 void rd_kafka_ApiVersionRequest (rd_kafka_broker_t *rkb,
210                                  int16_t ApiVersion,
211 				 rd_kafka_replyq_t replyq,
212 				 rd_kafka_resp_cb_t *resp_cb,
213 				 void *opaque);
214 
215 void rd_kafka_SaslHandshakeRequest (rd_kafka_broker_t *rkb,
216 				    const char *mechanism,
217 				    rd_kafka_replyq_t replyq,
218 				    rd_kafka_resp_cb_t *resp_cb,
219 				    void *opaque);
220 
221 void
222 rd_kafka_handle_SaslAuthenticate (rd_kafka_t *rk,
223                                   rd_kafka_broker_t *rkb,
224                                   rd_kafka_resp_err_t err,
225                                   rd_kafka_buf_t *rkbuf,
226                                   rd_kafka_buf_t *request,
227                                   void *opaque);
228 void rd_kafka_SaslAuthenticateRequest (rd_kafka_broker_t *rkb,
229                                        const void *buf, size_t size,
230                                        rd_kafka_replyq_t replyq,
231                                        rd_kafka_resp_cb_t *resp_cb,
232                                        void *opaque);
233 
234 int rd_kafka_ProduceRequest (rd_kafka_broker_t *rkb, rd_kafka_toppar_t *rktp,
235                              const rd_kafka_pid_t pid);
236 
237 rd_kafka_resp_err_t
238 rd_kafka_CreateTopicsRequest (rd_kafka_broker_t *rkb,
239                               const rd_list_t *new_topics /*(NewTopic_t*)*/,
240                               rd_kafka_AdminOptions_t *options,
241                               char *errstr, size_t errstr_size,
242                               rd_kafka_replyq_t replyq,
243                               rd_kafka_resp_cb_t *resp_cb,
244                               void *opaque);
245 
246 rd_kafka_resp_err_t
247 rd_kafka_DeleteTopicsRequest (rd_kafka_broker_t *rkb,
248                               const rd_list_t *del_topics /*(DeleteTopic_t*)*/,
249                               rd_kafka_AdminOptions_t *options,
250                               char *errstr, size_t errstr_size,
251                               rd_kafka_replyq_t replyq,
252                               rd_kafka_resp_cb_t *resp_cb,
253                               void *opaque);
254 
255 rd_kafka_resp_err_t
256 rd_kafka_CreatePartitionsRequest (rd_kafka_broker_t *rkb,
257                                   const rd_list_t *new_parts /*(NewPartitions_t*)*/,
258                                   rd_kafka_AdminOptions_t *options,
259                                   char *errstr, size_t errstr_size,
260                                   rd_kafka_replyq_t replyq,
261                                   rd_kafka_resp_cb_t *resp_cb,
262                                   void *opaque);
263 
264 rd_kafka_resp_err_t
265 rd_kafka_AlterConfigsRequest (rd_kafka_broker_t *rkb,
266                               const rd_list_t *configs /*(ConfigResource_t*)*/,
267                               rd_kafka_AdminOptions_t *options,
268                               char *errstr, size_t errstr_size,
269                               rd_kafka_replyq_t replyq,
270                               rd_kafka_resp_cb_t *resp_cb,
271                               void *opaque);
272 
273 rd_kafka_resp_err_t
274 rd_kafka_DescribeConfigsRequest (rd_kafka_broker_t *rkb,
275                                  const rd_list_t *configs /*(ConfigResource_t*)*/,
276                                  rd_kafka_AdminOptions_t *options,
277                                  char *errstr, size_t errstr_size,
278                                  rd_kafka_replyq_t replyq,
279                                  rd_kafka_resp_cb_t *resp_cb,
280                                  void *opaque);
281 
282 void
283 rd_kafka_handle_InitProducerId (rd_kafka_t *rk,
284                                 rd_kafka_broker_t *rkb,
285                                 rd_kafka_resp_err_t err,
286                                 rd_kafka_buf_t *rkbuf,
287                                 rd_kafka_buf_t *request,
288                                 void *opaque);
289 
290 rd_kafka_resp_err_t
291 rd_kafka_InitProducerIdRequest (rd_kafka_broker_t *rkb,
292                                 const char *transactional_id,
293                                 int transaction_timeout_ms,
294                                 const rd_kafka_pid_t *current_pid,
295                                 char *errstr, size_t errstr_size,
296                                 rd_kafka_replyq_t replyq,
297                                 rd_kafka_resp_cb_t *resp_cb,
298                                 void *opaque);
299 
300 rd_kafka_resp_err_t
301 rd_kafka_AddPartitionsToTxnRequest (rd_kafka_broker_t *rkb,
302                                     const char *transactional_id,
303                                     rd_kafka_pid_t pid,
304                                     const rd_kafka_toppar_tqhead_t *rktps,
305                                     char *errstr, size_t errstr_size,
306                                     rd_kafka_replyq_t replyq,
307                                     rd_kafka_resp_cb_t *resp_cb,
308                                     void *opaque);
309 
310 rd_kafka_resp_err_t
311 rd_kafka_AddOffsetsToTxnRequest (rd_kafka_broker_t *rkb,
312                                  const char *transactional_id,
313                                  rd_kafka_pid_t pid,
314                                  const char *group_id,
315                                  char *errstr, size_t errstr_size,
316                                  rd_kafka_replyq_t replyq,
317                                  rd_kafka_resp_cb_t *resp_cb,
318                                  void *opaque);
319 
320 rd_kafka_resp_err_t
321 rd_kafka_EndTxnRequest (rd_kafka_broker_t *rkb,
322                         const char *transactional_id,
323                         rd_kafka_pid_t pid,
324                         rd_bool_t committed,
325                         char *errstr, size_t errstr_size,
326                         rd_kafka_replyq_t replyq,
327                         rd_kafka_resp_cb_t *resp_cb,
328                         void *opaque);
329 
330 int unittest_request (void);
331 
332 #endif /* _RDKAFKA_REQUEST_H_ */
333