1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2015, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 #ifndef _RDKAFKA_OP_H_
29 #define _RDKAFKA_OP_H_
30 
31 
32 #include "rdkafka_msg.h"
33 #include "rdkafka_timer.h"
34 #include "rdkafka_admin.h"
35 
36 
37 /* Forward declarations */
38 typedef struct rd_kafka_q_s rd_kafka_q_t;
39 typedef struct rd_kafka_toppar_s rd_kafka_toppar_t;
40 typedef struct rd_kafka_op_s rd_kafka_op_t;
41 
42 /* One-off reply queue + reply version.
43  * All APIs that take a rd_kafka_replyq_t makes a copy of the
44  * struct as-is and grabs hold of the existing .q refcount.
45  * Think of replyq as a (Q,VERSION) tuple. */
46 typedef struct rd_kafka_replyq_s {
47 	rd_kafka_q_t *q;
48 	int32_t       version;
49 #if ENABLE_DEVEL
50 	char *_id; /* Devel id used for debugging reference leaks.
51 		    * Is a strdup() of the caller's function name,
52 		    * which makes for easy debugging with valgrind. */
53 #endif
54 } rd_kafka_replyq_t;
55 
56 
57 
58 
59 /**
60  * Flags used by:
61  *   - rd_kafka_op_t.rko_flags
62  *   - rd_kafka_buf_t.rkbuf_flags
63  */
64 #define RD_KAFKA_OP_F_FREE        0x1  /* rd_free payload when done with it */
65 #define RD_KAFKA_OP_F_NO_RESPONSE 0x2  /* rkbuf: Not expecting a response */
66 #define RD_KAFKA_OP_F_CRC         0x4  /* rkbuf: Perform CRC calculation */
67 #define RD_KAFKA_OP_F_BLOCKING    0x8  /* rkbuf: blocking protocol request */
68 #define RD_KAFKA_OP_F_REPROCESS   0x10 /* cgrp: Reprocess at a later time. */
69 #define RD_KAFKA_OP_F_SENT        0x20 /* rkbuf: request sent on wire */
70 #define RD_KAFKA_OP_F_FLEXVER     0x40 /* rkbuf: flexible protocol version
71                                         *        (KIP-482) */
72 #define RD_KAFKA_OP_F_NEED_MAKE   0x80 /* rkbuf: request content has not
73                                         *        been made yet, the make
74                                         *        callback will be triggered
75                                         *        to construct the request
76                                         *        right before it is sent. */
77 
78 typedef enum {
79         RD_KAFKA_OP_NONE,     /* No specific type, use OP_CB */
80 	RD_KAFKA_OP_FETCH,    /* Kafka thread -> Application */
81 	RD_KAFKA_OP_ERR,      /* Kafka thread -> Application */
82         RD_KAFKA_OP_CONSUMER_ERR, /* Kafka thread -> Application */
83 	RD_KAFKA_OP_DR,       /* Kafka thread -> Application
84 			       * Produce message delivery report */
85 	RD_KAFKA_OP_STATS,    /* Kafka thread -> Application */
86 
87         RD_KAFKA_OP_OFFSET_COMMIT, /* any -> toppar's Broker thread */
88         RD_KAFKA_OP_NODE_UPDATE,   /* any -> Broker thread: node update */
89 
90         RD_KAFKA_OP_XMIT_BUF, /* transmit buffer: any -> broker thread */
91         RD_KAFKA_OP_RECV_BUF, /* received response buffer: broker thr -> any */
92         RD_KAFKA_OP_XMIT_RETRY, /* retry buffer xmit: any -> broker thread */
93         RD_KAFKA_OP_FETCH_START, /* Application -> toppar's handler thread */
94         RD_KAFKA_OP_FETCH_STOP,  /* Application -> toppar's handler thread */
95         RD_KAFKA_OP_SEEK,        /* Application -> toppar's handler thread */
96 	RD_KAFKA_OP_PAUSE,       /* Application -> toppar's handler thread */
97         RD_KAFKA_OP_OFFSET_FETCH, /* Broker -> broker thread: fetch offsets
98                                    * for topic. */
99 
100         RD_KAFKA_OP_PARTITION_JOIN,  /* * -> cgrp op:   add toppar to cgrp
101                                       * * -> broker op: add toppar to broker */
102         RD_KAFKA_OP_PARTITION_LEAVE, /* * -> cgrp op:   remove toppar from cgrp
103                                       * * -> broker op: remove toppar from rkb*/
104         RD_KAFKA_OP_REBALANCE,       /* broker thread -> app:
105                                       * group rebalance */
106         RD_KAFKA_OP_TERMINATE,       /* For generic use */
107         RD_KAFKA_OP_COORD_QUERY,     /* Query for coordinator */
108         RD_KAFKA_OP_SUBSCRIBE,       /* New subscription */
109         RD_KAFKA_OP_ASSIGN,          /* New assignment */
110         RD_KAFKA_OP_GET_SUBSCRIPTION,/* Get current subscription.
111 				      * Reuses u.subscribe */
112         RD_KAFKA_OP_GET_ASSIGNMENT,  /* Get current assignment.
113 				      * Reuses u.assign */
114 	RD_KAFKA_OP_THROTTLE,        /* Throttle info */
115 	RD_KAFKA_OP_NAME,            /* Request name */
116         RD_KAFKA_OP_CG_METADATA,     /**< Request consumer metadata */
117 	RD_KAFKA_OP_OFFSET_RESET,    /* Offset reset */
118         RD_KAFKA_OP_METADATA,        /* Metadata response */
119         RD_KAFKA_OP_LOG,             /* Log */
120         RD_KAFKA_OP_WAKEUP,          /* Wake-up signaling */
121         RD_KAFKA_OP_CREATETOPICS,    /**< Admin: CreateTopics: u.admin_request*/
122         RD_KAFKA_OP_DELETETOPICS,    /**< Admin: DeleteTopics: u.admin_request*/
123         RD_KAFKA_OP_CREATEPARTITIONS,/**< Admin: CreatePartitions:
124                                       *   u.admin_request*/
125         RD_KAFKA_OP_ALTERCONFIGS,    /**< Admin: AlterConfigs: u.admin_request*/
126         RD_KAFKA_OP_DESCRIBECONFIGS, /**< Admin: DescribeConfigs:
127                                       *   u.admin_request*/
128         RD_KAFKA_OP_DELETERECORDS,   /**< Admin: DeleteRecords:
129                                       *   u.admin_request*/
130         RD_KAFKA_OP_DELETEGROUPS,    /**< Admin: DeleteGroups: u.admin_request*/
131         RD_KAFKA_OP_DELETECONSUMERGROUPOFFSETS, /**< Admin:
132                                                  *   DeleteConsumerGroupOffsets
133                                                  *   u.admin_request */
134         RD_KAFKA_OP_ADMIN_FANOUT,    /**< Admin: fanout request */
135         RD_KAFKA_OP_ADMIN_RESULT,    /**< Admin API .._result_t */
136         RD_KAFKA_OP_PURGE,           /**< Purge queues */
137         RD_KAFKA_OP_CONNECT,         /**< Connect (to broker) */
138         RD_KAFKA_OP_OAUTHBEARER_REFRESH, /**< Refresh OAUTHBEARER token */
139         RD_KAFKA_OP_MOCK,            /**< Mock cluster command */
140         RD_KAFKA_OP_BROKER_MONITOR,    /**< Broker state change */
141         RD_KAFKA_OP_TXN,             /**< Transaction command */
142         RD_KAFKA_OP_GET_REBALANCE_PROTOCOL, /**< Get rebalance protocol */
143         RD_KAFKA_OP_LEADERS,         /**< Partition leader query */
144         RD_KAFKA_OP_BARRIER,         /**< Version barrier bump */
145         RD_KAFKA_OP__END
146 } rd_kafka_op_type_t;
147 
148 /* Flags used with op_type_t */
149 #define RD_KAFKA_OP_CB        (int)(1 << 29)  /* Callback op. */
150 #define RD_KAFKA_OP_REPLY     (int)(1 << 30)  /* Reply op. */
151 #define RD_KAFKA_OP_FLAGMASK  (RD_KAFKA_OP_CB | RD_KAFKA_OP_REPLY)
152 
153 
154 /**
155  * @brief Op/queue priority levels.
156  * @remark Since priority levels alter the FIFO order, pay extra attention
157  *         to preserve ordering as deemed necessary.
158  * @remark Priority should only be set on ops destined for application
159  *         facing queues (rk_rep, rkcg_q, etc).
160  */
161 typedef enum {
162         RD_KAFKA_PRIO_NORMAL = 0,   /* Normal bulk, messages, DRs, etc. */
163         RD_KAFKA_PRIO_MEDIUM,       /* Prioritize in front of bulk,
164                                      * still at some scale. e.g. logs, .. */
165         RD_KAFKA_PRIO_HIGH,         /* Small scale high priority */
166         RD_KAFKA_PRIO_FLASH         /* Micro scale, immediate delivery. */
167 } rd_kafka_prio_t;
168 
169 
170 /**
171  * @brief Op handler result
172  *
173  * @remark When returning YIELD from a handler the handler will
174  *         need to have made sure to either re-enqueue the op or destroy it
175  *         since the caller will not touch the op anymore.
176  */
177 typedef enum {
178         RD_KAFKA_OP_RES_PASS,    /* Not handled, pass to caller */
179         RD_KAFKA_OP_RES_HANDLED, /* Op was handled (through callbacks) */
180         RD_KAFKA_OP_RES_KEEP,    /* Op was handled (through callbacks)
181                                   * but must not be destroyed by op_handle().
182                                   * It is NOT PERMITTED to return RES_KEEP
183                                   * from a callback handling a ERR__DESTROY
184                                   * event. */
185         RD_KAFKA_OP_RES_YIELD    /* Callback called yield */
186 } rd_kafka_op_res_t;
187 
188 
189 /**
190  * @brief Queue serve callback call type
191  */
192 typedef enum {
193         RD_KAFKA_Q_CB_INVALID, /* dont use */
194         RD_KAFKA_Q_CB_CALLBACK,/* trigger callback based on op */
195         RD_KAFKA_Q_CB_RETURN,  /* return op rather than trigger callback
196                                 * (if possible)*/
197         RD_KAFKA_Q_CB_FORCE_RETURN, /* return op, regardless of callback. */
198         RD_KAFKA_Q_CB_EVENT    /* like _Q_CB_RETURN but return event_t:ed op */
199 } rd_kafka_q_cb_type_t;
200 
201 /**
202  * @brief Queue serve callback
203  * @remark See rd_kafka_op_res_t docs for return semantics.
204  */
205 typedef rd_kafka_op_res_t
206 (rd_kafka_q_serve_cb_t) (rd_kafka_t *rk,
207                          struct rd_kafka_q_s *rkq,
208                          struct rd_kafka_op_s *rko,
209                          rd_kafka_q_cb_type_t cb_type, void *opaque)
210         RD_WARN_UNUSED_RESULT;
211 
212 /**
213  * @brief Enumerates the assign op sub-types.
214  */
215 typedef enum {
216         RD_KAFKA_ASSIGN_METHOD_ASSIGN,        /**< Absolute assign/unassign */
217         RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN,   /**< Incremental assign */
218         RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN  /**< Incremental unassign */
219 } rd_kafka_assign_method_t;
220 
221 /**
222  * @brief Op callback type
223  */
224 typedef rd_kafka_op_res_t (rd_kafka_op_cb_t) (rd_kafka_t *rk,
225                                               rd_kafka_q_t *rkq,
226                                               struct rd_kafka_op_s *rko)
227                 RD_WARN_UNUSED_RESULT;
228 
229 /* Forward declaration */
230 struct rd_kafka_admin_worker_cbs;
231 struct rd_kafka_admin_fanout_worker_cbs;
232 
233 
234 #define RD_KAFKA_OP_TYPE_ASSERT(rko,type)                               \
235         rd_assert(((rko)->rko_type & ~RD_KAFKA_OP_FLAGMASK) == (type))
236 
237 struct rd_kafka_op_s {
238 	TAILQ_ENTRY(rd_kafka_op_s) rko_link;
239 
240 	rd_kafka_op_type_t    rko_type;   /* Internal op type */
241 	rd_kafka_event_type_t rko_evtype;
242 	int                   rko_flags;  /* See RD_KAFKA_OP_F_... above */
243 	int32_t               rko_version;
244 	rd_kafka_resp_err_t   rko_err;
245         rd_kafka_error_t     *rko_error;
246 	int32_t               rko_len;    /* Depends on type, typically the
247 					   * message length. */
248         rd_kafka_prio_t       rko_prio;   /**< In-queue priority.
249                                            *   Higher value means higher prio*/
250 
251 	rd_kafka_toppar_t    *rko_rktp;
252 
253         /*
254 	 * Generic fields
255 	 */
256 
257 	/* Indicates request: enqueue reply on rko_replyq.q with .version.
258 	 * .q is refcounted. */
259 	rd_kafka_replyq_t rko_replyq;
260 
261         /* Original queue's op serve callback and opaque, if any.
262          * Mainly used for forwarded queues to use the original queue's
263          * serve function from the forwarded position. */
264         rd_kafka_q_serve_cb_t *rko_serve;
265         void *rko_serve_opaque;
266 
267 	rd_kafka_t     *rko_rk;
268 
269 #if ENABLE_DEVEL
270         const char *rko_source;  /**< Where op was created */
271 #endif
272 
273         /* RD_KAFKA_OP_CB */
274         rd_kafka_op_cb_t *rko_op_cb;
275 
276 	union {
277 		struct {
278 			rd_kafka_buf_t *rkbuf;
279 			rd_kafka_msg_t  rkm;
280 			int evidx;
281 		} fetch;
282 
283 		struct {
284 			rd_kafka_topic_partition_list_t *partitions;
285                         /** Require stable (txn-commited) offsets */
286                         rd_bool_t require_stable;
287 			int do_free; /* free .partitions on destroy() */
288 		} offset_fetch;
289 
290 		struct {
291 			rd_kafka_topic_partition_list_t *partitions;
292 			void (*cb) (rd_kafka_t *rk,
293 				    rd_kafka_resp_err_t err,
294 				    rd_kafka_topic_partition_list_t *offsets,
295 				    void *opaque);
296 			void *opaque;
297 			int silent_empty; /**< Fail silently if there are no
298 					   *   offsets to commit. */
299                         rd_ts_t ts_timeout;
300                         char *reason;
301 		} offset_commit;
302 
303 		struct {
304 			rd_kafka_topic_partition_list_t *topics;
305 		} subscribe; /* also used for GET_SUBSCRIPTION */
306 
307 		struct {
308 			rd_kafka_topic_partition_list_t *partitions;
309                         rd_kafka_assign_method_t method;
310 		} assign; /* also used for GET_ASSIGNMENT */
311 
312                 struct {
313                         rd_kafka_topic_partition_list_t *partitions;
314                 } rebalance;
315 
316                 struct {
317                         const char *str;
318                 } rebalance_protocol;
319 
320 		struct {
321 			char *str;
322 		} name;
323 
324                 rd_kafka_consumer_group_metadata_t *cg_metadata;
325 
326 		struct {
327 			int64_t offset;
328 			char *errstr;
329 			rd_kafka_msg_t rkm;
330                         rd_kafka_topic_t *rkt;
331                         int fatal;  /**< This was a ERR__FATAL error that has
332                                      *   been translated to the fatal error
333                                      *   code. */
334 		} err;  /* used for ERR and CONSUMER_ERR */
335 
336 		struct {
337 			int throttle_time;
338 			int32_t nodeid;
339 			char *nodename;
340 		} throttle;
341 
342 		struct {
343 			char *json;
344 			size_t json_len;
345 		} stats;
346 
347 		struct {
348 			rd_kafka_buf_t *rkbuf;
349 		} xbuf; /* XMIT_BUF and RECV_BUF */
350 
351                 /* RD_KAFKA_OP_METADATA */
352                 struct {
353                         rd_kafka_metadata_t *md;
354                         int force; /* force request regardless of outstanding
355                                     * metadata requests. */
356                 } metadata;
357 
358 		struct {
359 			rd_kafka_topic_t *rkt;
360 			rd_kafka_msgq_t msgq;
361 			rd_kafka_msgq_t msgq2;
362 			int do_purge2;
363 		} dr;
364 
365 		struct {
366 			int32_t nodeid;
367 			char    nodename[RD_KAFKA_NODENAME_SIZE];
368 		} node;
369 
370 		struct {
371 			int64_t offset;
372 			char *reason;
373 		} offset_reset;
374 
375 		struct {
376 			int64_t offset;
377 			struct rd_kafka_cgrp_s *rkcg;
378 		} fetch_start; /* reused for SEEK */
379 
380 		struct {
381 			int pause;
382 			int flag;
383 		} pause;
384 
385                 struct {
386                         char fac[64];
387                         int  level;
388                         char *str;
389                         int  ctx;
390                 } log;
391 
392                 struct {
393                         rd_kafka_AdminOptions_t options; /**< Copy of user's
394                                                           * options */
395                         rd_ts_t abs_timeout;        /**< Absolute timeout
396                                                      *   for this request. */
397                         rd_kafka_timer_t tmr;       /**< Timeout timer */
398                         struct rd_kafka_enq_once_s *eonce; /**< Enqueue op
399                                                             * only once,
400                                                             * used to
401                                                             * (re)trigger
402                                                             * the request op
403                                                             * upon broker state
404                                                             * changes while
405                                                             * waiting for the
406                                                             * controller, or
407                                                             * due to .tmr
408                                                             * timeout. */
409                         rd_list_t args;/**< Type depends on request, e.g.
410                                         *   rd_kafka_NewTopic_t for CreateTopics
411                                         */
412 
413                         rd_kafka_buf_t *reply_buf; /**< Protocol reply,
414                                                     *   temporary reference not
415                                                     *   owned by this rko */
416 
417                         /**< Worker callbacks, see rdkafka_admin.c */
418                         struct rd_kafka_admin_worker_cbs *cbs;
419 
420                         /** Worker state */
421                         enum {
422                                 RD_KAFKA_ADMIN_STATE_INIT,
423                                 RD_KAFKA_ADMIN_STATE_WAIT_BROKER,
424                                 RD_KAFKA_ADMIN_STATE_WAIT_CONTROLLER,
425                                 RD_KAFKA_ADMIN_STATE_WAIT_FANOUTS,
426                                 RD_KAFKA_ADMIN_STATE_CONSTRUCT_REQUEST,
427                                 RD_KAFKA_ADMIN_STATE_WAIT_RESPONSE,
428                         } state;
429 
430                         int32_t broker_id; /**< Requested broker id to
431                                             *   communicate with.
432                                             *   Used for AlterConfigs, et.al,
433                                             *   that needs to speak to a
434                                             *   specific broker rather than
435                                             *   the controller.
436                                             *   See RD_KAFKA_ADMIN_TARGET_..
437                                             *   for special values (coordinator,
438                                             *   fanout, etc).
439                                             */
440                         /** The type of coordinator to look up */
441                         rd_kafka_coordtype_t coordtype;
442                         /** Which coordinator to look up */
443                         char *coordkey;
444 
445                         /** Application's reply queue */
446                         rd_kafka_replyq_t replyq;
447                         rd_kafka_event_type_t reply_event_type;
448 
449                         /** A collection of fanout child ops. */
450                         struct {
451                                 /** The type of request being fanned out.
452                                  *  This is used for the ADMIN_RESULT. */
453                                 rd_kafka_op_type_t reqtype;
454 
455                                 /** Worker callbacks, see rdkafka_admin.c */
456                                 struct rd_kafka_admin_fanout_worker_cbs *cbs;
457 
458                                 /** Number of outstanding requests remaining to
459                                  *  wait for. */
460                                 int outstanding;
461 
462                                 /** Incremental results from fanouts.
463                                  *  This list is pre-allocated to the number
464                                  *  of input objects and can thus be set
465                                  *  by index to retain original ordering. */
466                                 rd_list_t results;
467 
468                                 /** Reply event type */
469                                 rd_kafka_event_type_t reply_event_type;
470 
471                         } fanout;
472 
473                         /** A reference to the parent ADMIN_FANOUT op that
474                          *  spawned this op, if applicable. NULL otherwise. */
475                         struct rd_kafka_op_s *fanout_parent;
476 
477                 } admin_request;
478 
479                 struct {
480                         rd_kafka_op_type_t reqtype; /**< Request op type,
481                                                      *   used for logging. */
482 
483                         rd_list_t args; /**< Args moved from the request op
484                                          *   when the result op is created.
485                                          *
486                                          *   Type depends on request.
487                                          */
488 
489                         char *errstr;      /**< Error string, if rko_err
490                                             *   is set, else NULL. */
491 
492                         rd_list_t results; /**< Type depends on request type:
493                                             *
494                                             * (rd_kafka_topic_result_t *):
495                                             * CreateTopics, DeleteTopics,
496                                             * CreatePartitions.
497                                             *
498                                             * (rd_kafka_ConfigResource_t *):
499                                             * AlterConfigs, DescribeConfigs
500                                             */
501 
502                         void *opaque;     /**< Application's opaque as set by
503                                            *   rd_kafka_AdminOptions_set_opaque
504                                            */
505 
506                         /** A reference to the parent ADMIN_FANOUT op that
507                          *  spawned this op, if applicable. NULL otherwise. */
508                         struct rd_kafka_op_s *fanout_parent;
509                 } admin_result;
510 
511                 struct {
512                         int flags; /**< purge_flags from rd_kafka_purge() */
513                 } purge;
514 
515                 /**< Mock cluster command */
516                 struct {
517                         enum {
518                                 RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR,
519                                 RD_KAFKA_MOCK_CMD_TOPIC_CREATE,
520                                 RD_KAFKA_MOCK_CMD_PART_SET_LEADER,
521                                 RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER,
522                                 RD_KAFKA_MOCK_CMD_PART_SET_FOLLOWER_WMARKS,
523                                 RD_KAFKA_MOCK_CMD_BROKER_SET_UPDOWN,
524                                 RD_KAFKA_MOCK_CMD_BROKER_SET_RTT,
525                                 RD_KAFKA_MOCK_CMD_BROKER_SET_RACK,
526                                 RD_KAFKA_MOCK_CMD_COORD_SET,
527                                 RD_KAFKA_MOCK_CMD_APIVERSION_SET,
528                         } cmd;
529 
530                         rd_kafka_resp_err_t err; /**< Error for:
531                                                   *    TOPIC_SET_ERROR */
532                         char *name;              /**< For:
533                                                   *    TOPIC_SET_ERROR
534                                                   *    TOPIC_CREATE
535                                                   *    PART_SET_FOLLOWER
536                                                   *    PART_SET_FOLLOWER_WMARKS
537                                                   *    BROKER_SET_RACK
538                                                   *    COORD_SET (key_type) */
539                         char *str;               /**< For:
540                                                   *    COORD_SET (key) */
541                         int32_t partition;       /**< For:
542                                                   *    PART_SET_FOLLOWER
543                                                   *    PART_SET_FOLLOWER_WMARKS
544                                                   *    PART_SET_LEADER
545                                                   *    APIVERSION_SET (ApiKey)
546                                                   */
547                         int32_t broker_id;       /**< For:
548                                                   *    PART_SET_FOLLOWER
549                                                   *    PART_SET_LEADER
550                                                   *    BROKER_SET_UPDOWN
551                                                   *    BROKER_SET_RACK
552                                                   *    COORD_SET */
553                         int64_t lo;              /**< Low offset, for:
554                                                   *    TOPIC_CREATE (part cnt)
555                                                   *    PART_SET_FOLLOWER_WMARKS
556                                                   *    BROKER_SET_UPDOWN
557                                                   *    APIVERSION_SET (minver)
558                                                   *    BROKER_SET_RTT
559                                                   */
560                         int64_t hi;              /**< High offset, for:
561                                                   *    TOPIC_CREATE (repl fact)
562                                                   *    PART_SET_FOLLOWER_WMARKS
563                                                   *    APIVERSION_SET (maxver)
564                                                   */
565                 } mock;
566 
567                 struct {
568                         struct rd_kafka_broker_s *rkb; /**< Broker who's state
569                                                         *   changed. */
570                         /**< Callback to trigger on the op handler's thread. */
571                         void (*cb) (struct rd_kafka_broker_s *rkb);
572                 } broker_monitor;
573 
574                 struct {
575                         /** Consumer group metadata for send_offsets_to.. */
576                         rd_kafka_consumer_group_metadata_t *cgmetadata;
577                         /** Consumer group id for AddOffsetsTo.. */
578                         char *group_id;
579                         int   timeout_ms; /**< Operation timeout */
580                         rd_ts_t abs_timeout; /**< Absolute time */
581                         /**< Offsets to commit */
582                         rd_kafka_topic_partition_list_t *offsets;
583                 } txn;
584 
585                 struct {
586                         /* This struct serves two purposes, the fields
587                          * with "Request:" are used for the async workers state
588                          * while the "Reply:" fields is a separate reply
589                          * rko that is enqueued for the caller upon
590                          * completion or failure. */
591 
592                         /** Request: Partitions to query.
593                          *  Reply:   Queried partitions with .err field set. */
594                         rd_kafka_topic_partition_list_t *partitions;
595 
596                         /** Request: Absolute timeout */
597                         rd_ts_t ts_timeout;
598 
599                         /** Request: Metadata query timer */
600                         rd_kafka_timer_t query_tmr;
601 
602                         /** Request: Timeout timer */
603                         rd_kafka_timer_t timeout_tmr;
604 
605                         /** Request: Enqueue op only once, used to (re)trigger
606                          *  metadata cache lookups, topic refresh, timeout. */
607                         struct rd_kafka_enq_once_s *eonce;
608 
609                         /** Request: Caller's replyq */
610                         rd_kafka_replyq_t replyq;
611 
612                         /** Request: Number of metadata queries made. */
613                         int query_cnt;
614 
615                         /** Reply: Leaders (result)
616                          * (rd_kafka_partition_leader*) */
617                         rd_list_t *leaders;
618 
619                         /** Reply: Callback on completion (or failure) */
620                         rd_kafka_op_cb_t *cb;
621 
622                         /** Reply: Callback opaque */
623                         void *opaque;
624 
625                 } leaders;
626 
627         } rko_u;
628 };
629 
630 TAILQ_HEAD(rd_kafka_op_head_s, rd_kafka_op_s);
631 
632 
633 
634 
635 const char *rd_kafka_op2str (rd_kafka_op_type_t type);
636 void rd_kafka_op_destroy (rd_kafka_op_t *rko);
637 rd_kafka_op_t *rd_kafka_op_new0 (const char *source, rd_kafka_op_type_t type);
638 #if ENABLE_DEVEL
639 #define _STRINGIFYX(A) #A
640 #define _STRINGIFY(A) _STRINGIFYX(A)
641 #define rd_kafka_op_new(type)                                   \
642         rd_kafka_op_new0(__FILE__ ":" _STRINGIFY(__LINE__), type)
643 #else
644 #define rd_kafka_op_new(type) rd_kafka_op_new0(NULL, type)
645 #endif
646 rd_kafka_op_t *rd_kafka_op_new_reply (rd_kafka_op_t *rko_orig,
647                                       rd_kafka_resp_err_t err);
648 rd_kafka_op_t *rd_kafka_op_new_cb (rd_kafka_t *rk,
649                                    rd_kafka_op_type_t type,
650                                    rd_kafka_op_cb_t *cb);
651 int rd_kafka_op_reply (rd_kafka_op_t *rko,
652                        rd_kafka_resp_err_t err);
653 int rd_kafka_op_error_reply (rd_kafka_op_t *rko,
654                              rd_kafka_error_t *error);
655 
656 #define rd_kafka_op_set_prio(rko,prio) ((rko)->rko_prio = prio)
657 
658 #define rd_kafka_op_err(rk,err,...) do {				\
659 		if (!((rk)->rk_conf.enabled_events & RD_KAFKA_EVENT_ERROR)) { \
660 			rd_kafka_log(rk, LOG_ERR, "ERROR", __VA_ARGS__); \
661 			break;						\
662 		}							\
663 		rd_kafka_q_op_err((rk)->rk_rep, err, __VA_ARGS__);      \
664 	} while (0)
665 
666 void rd_kafka_q_op_err (rd_kafka_q_t *rkq, rd_kafka_resp_err_t err,
667                         const char *fmt, ...)
668         RD_FORMAT(printf, 3, 4);
669 void rd_kafka_consumer_err (rd_kafka_q_t *rkq, int32_t broker_id,
670                             rd_kafka_resp_err_t err, int32_t version,
671                             const char *topic, rd_kafka_toppar_t *rktp,
672                             int64_t offset, const char *fmt, ...)
673         RD_FORMAT(printf, 8, 9);
674 rd_kafka_op_t *rd_kafka_op_req0 (rd_kafka_q_t *destq,
675                                  rd_kafka_q_t *recvq,
676                                  rd_kafka_op_t *rko,
677                                  int timeout_ms);
678 rd_kafka_op_t *rd_kafka_op_req (rd_kafka_q_t *destq,
679                                 rd_kafka_op_t *rko,
680                                 int timeout_ms);
681 rd_kafka_op_t *rd_kafka_op_req2 (rd_kafka_q_t *destq, rd_kafka_op_type_t type);
682 rd_kafka_resp_err_t rd_kafka_op_err_destroy (rd_kafka_op_t *rko);
683 rd_kafka_error_t *rd_kafka_op_error_destroy (rd_kafka_op_t *rko);
684 
685 rd_kafka_op_res_t rd_kafka_op_call (rd_kafka_t *rk,
686                                     rd_kafka_q_t *rkq, rd_kafka_op_t *rko)
687         RD_WARN_UNUSED_RESULT;
688 
689 rd_kafka_op_t *
690 rd_kafka_op_new_fetch_msg (rd_kafka_msg_t **rkmp,
691                            rd_kafka_toppar_t *rktp,
692                            int32_t version,
693                            rd_kafka_buf_t *rkbuf,
694                            int64_t offset,
695                            size_t key_len, const void *key,
696                            size_t val_len, const void *val);
697 
698 rd_kafka_op_t *
699 rd_kafka_op_new_ctrl_msg (rd_kafka_toppar_t *rktp,
700                            int32_t version,
701                            rd_kafka_buf_t *rkbuf,
702                            int64_t offset);
703 
704 void rd_kafka_op_throttle_time (struct rd_kafka_broker_s *rkb,
705 				rd_kafka_q_t *rkq,
706 				int throttle_time);
707 
708 
709 rd_kafka_op_res_t
710 rd_kafka_op_handle (rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko,
711                     rd_kafka_q_cb_type_t cb_type, void *opaque,
712                     rd_kafka_q_serve_cb_t *callback) RD_WARN_UNUSED_RESULT;
713 
714 
715 extern rd_atomic32_t rd_kafka_op_cnt;
716 
717 void rd_kafka_op_print (FILE *fp, const char *prefix, rd_kafka_op_t *rko);
718 
719 void rd_kafka_op_offset_store (rd_kafka_t *rk, rd_kafka_op_t *rko);
720 
721 
722 #define rd_kafka_op_is_ctrl_msg(rko)                                    \
723         ((rko)->rko_type == RD_KAFKA_OP_FETCH &&                        \
724          !(rko)->rko_err &&                                             \
725          ((rko)->rko_u.fetch.rkm.rkm_flags & RD_KAFKA_MSG_F_CONTROL))
726 
727 
728 
729 /**
730  * @returns true if the rko's replyq is valid and the
731  *          rko's rktp version (if any) is not outdated.
732  */
733 #define rd_kafka_op_replyq_is_valid(RKO)                        \
734         (rd_kafka_replyq_is_valid(&(RKO)->rko_replyq) &&        \
735          !rd_kafka_op_version_outdated((RKO), 0))
736 
737 #endif /* _RDKAFKA_OP_H_ */
738