1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2020, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 /**
30  * @name Transactions example for Apache Kafka <= 2.4.0 (no KIP-447 support).
31  *
32  * This example show-cases a simple transactional consume-process-produce
33  * application that reads messages from an input topic, extracts all
34  * numbers from the message's value string, adds them up, and sends
35  * the sum to the output topic as part of a transaction.
36  * The transaction is committed every 5 seconds or 100 messages, whichever
37  * comes first. As the transaction is committed a new transaction is started.
38  *
39  * @remark This example does not yet support incremental rebalancing and thus
40  *         not the cooperative-sticky partition.assignment.strategy.
41  */
42 
43 #include <stdio.h>
44 #include <signal.h>
45 #include <unistd.h>
46 #include <string.h>
47 #include <stdlib.h>
48 #include <time.h>
49 #include <ctype.h>
50 
51 
52 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
53  * is builtin from within the librdkafka source tree and thus differs. */
54 #include "rdkafka.h"
55 
56 
57 static volatile sig_atomic_t run = 1;
58 
59 static rd_kafka_t *consumer;
60 
61 /* From command-line arguments */
62 static const char *brokers, *input_topic, *output_topic;
63 
64 
65 /**
66  * @struct This is the per input partition state, constisting of
67  * a transactional producer and the in-memory state for the current transaction.
68  * This demo simply finds all numbers (ascii string numbers) in the message
69  * payload and adds them.
70  */
71 struct state {
72         rd_kafka_t *producer; /**< Per-input partition output producer */
73         rd_kafka_topic_partition_t *rktpar; /**< Back-pointer to the
74                                              *   input partition. */
75         time_t last_commit; /**< Last transaction commit */
76         int    msgcnt;      /**< Number of messages processed in current txn */
77 };
78 /* Current assignment for the input consumer.
79  * The .opaque field of each partition points to an allocated 'struct state'.
80  */
81 static rd_kafka_topic_partition_list_t *assigned_partitions;
82 
83 
84 
85 /**
86  * @brief A fatal error has occurred, immediately exit the application.
87  */
88 #define fatal(...) do {                                 \
89                 fprintf(stderr, "FATAL ERROR: ");       \
90                 fprintf(stderr, __VA_ARGS__);           \
91                 fprintf(stderr, "\n");                  \
92                 exit(1);                                \
93         } while (0)
94 
95 /**
96  * @brief Same as fatal() but takes an rd_kafka_error_t object, prints its
97  *        error message, destroys the object and then exits fatally.
98  */
99 #define fatal_error(what,error) do {                                    \
100                 fprintf(stderr, "FATAL ERROR: %s: %s: %s\n",             \
101                         what, rd_kafka_error_name(error),               \
102                         rd_kafka_error_string(error));                  \
103                 rd_kafka_error_destroy(error);                          \
104                 exit(1);                                                \
105         } while (0)
106 
107 /**
108  * @brief Signal termination of program
109  */
stop(int sig)110 static void stop (int sig) {
111         run = 0;
112 }
113 
114 
115 /**
116  * @brief Message delivery report callback.
117  *
118  * This callback is called exactly once per message, indicating if
119  * the message was succesfully delivered
120  * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
121  * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
122  *
123  * The callback is triggered from rd_kafka_poll(), rd_kafka_flush(),
124  * rd_kafka_abort_transaction() and rd_kafka_commit_transaction() and
125  * executes on the application's thread.
126  *
127  * The current transactional will enter the abortable state if any
128  * message permanently fails delivery and the application must then
129  * call rd_kafka_abort_transaction(). But it does not need to be done from
130  * here, this state is checked by all the transactional APIs and it is better
131  * to perform this error checking when calling
132  * rd_kafka_send_offsets_to_transaction() and rd_kafka_commit_transaction().
133  * In the case of transactional producing the delivery report callback is
134  * mostly useful for logging the produce failures.
135  */
dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)136 static void dr_msg_cb (rd_kafka_t *rk,
137                        const rd_kafka_message_t *rkmessage, void *opaque) {
138         if (rkmessage->err)
139                 fprintf(stderr,
140                         "%% Message delivery failed: %s\n",
141                         rd_kafka_err2str(rkmessage->err));
142 
143         /* The rkmessage is destroyed automatically by librdkafka */
144 }
145 
146 
147 
148 /**
149  * @brief Create a transactional producer for the given input pratition
150  *        and begin a new transaction.
151  */
152 static rd_kafka_t *
create_transactional_producer(const rd_kafka_topic_partition_t * rktpar)153 create_transactional_producer (const rd_kafka_topic_partition_t *rktpar) {
154         rd_kafka_conf_t *conf = rd_kafka_conf_new();
155         rd_kafka_t *rk;
156         char errstr[256];
157         rd_kafka_error_t *error;
158         char transactional_id[256];
159 
160         snprintf(transactional_id, sizeof(transactional_id),
161                  "librdkafka_transactions_older_example_%s-%d",
162                  rktpar->topic, rktpar->partition);
163 
164         if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
165                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
166             rd_kafka_conf_set(conf, "transactional.id", transactional_id,
167                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
168             rd_kafka_conf_set(conf, "transaction.timeout.ms", "60000",
169                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
170                 fatal("Failed to configure producer: %s", errstr);
171 
172         /* This callback will be called once per message to indicate
173          * final delivery status. */
174         rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
175 
176         /* Create producer */
177         rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
178         if (!rk) {
179                 rd_kafka_conf_destroy(conf);
180                 fatal("Failed to create producer: %s", errstr);
181         }
182 
183         /* Initialize transactions, this is only performed once
184          * per transactional producer to acquire its producer id, et.al. */
185         error = rd_kafka_init_transactions(rk, -1);
186         if (error)
187                 fatal_error("init_transactions()", error);
188 
189 
190         /* Begin a new transaction */
191         error = rd_kafka_begin_transaction(rk);
192         if (error)
193                 fatal_error("begin_transaction()", error);
194 
195         return rk;
196 }
197 
198 
199 /**
200  * @brief Abort the current transaction and destroy the producer.
201  */
destroy_transactional_producer(rd_kafka_t * rk)202 static void destroy_transactional_producer (rd_kafka_t *rk) {
203         rd_kafka_error_t *error;
204 
205         fprintf(stdout, "%s: aborting transaction and terminating producer\n",
206                 rd_kafka_name(rk));
207 
208         /* Abort the current transaction, ignore any errors
209          * since we're terminating the producer anyway. */
210         error = rd_kafka_abort_transaction(rk, -1);
211         if (error) {
212                 fprintf(stderr,
213                         "WARNING: Ignoring abort_transaction() error since "
214                         "producer is being destroyed: %s\n",
215                         rd_kafka_error_string(error));
216                 rd_kafka_error_destroy(error);
217         }
218 
219         rd_kafka_destroy(rk);
220 }
221 
222 
223 
224 /**
225  * @brief Abort the current transaction and rewind consumer offsets to
226  *        position where the transaction last started, i.e., the committed
227  *        consumer offset.
228  */
abort_transaction_and_rewind(struct state * state)229 static void abort_transaction_and_rewind (struct state *state) {
230         rd_kafka_topic_t *rkt = rd_kafka_topic_new(consumer,
231                                                    state->rktpar->topic, NULL);
232         rd_kafka_topic_partition_list_t *offset;
233         rd_kafka_resp_err_t err;
234         rd_kafka_error_t *error;
235 
236         fprintf(stdout,
237                 "Aborting transaction and rewinding offset for %s [%d]\n",
238                 state->rktpar->topic, state->rktpar->partition);
239 
240         /* Abort the current transaction */
241         error = rd_kafka_abort_transaction(state->producer, -1);
242         if (error)
243                 fatal_error("Failed to abort transaction", error);
244 
245         /* Begin a new transaction */
246         error = rd_kafka_begin_transaction(state->producer);
247         if (error)
248                 fatal_error("Failed to begin transaction", error);
249 
250         /* Get committed offset for this partition */
251         offset = rd_kafka_topic_partition_list_new(1);
252         rd_kafka_topic_partition_list_add(offset,
253                                           state->rktpar->topic,
254                                           state->rktpar->partition);
255 
256         /* Note: Timeout must be lower than max.poll.interval.ms */
257         err = rd_kafka_committed(consumer, offset, 10*1000);
258         if (err)
259                 fatal("Failed to acquire committed offset for %s [%d]: %s",
260                       state->rktpar->topic, (int)state->rktpar->partition,
261                       rd_kafka_err2str(err));
262 
263         /* Seek to committed offset, or start of partition if no
264          * no committed offset is available. */
265         err = rd_kafka_seek(rkt, state->rktpar->partition,
266                             offset->elems[0].offset < 0 ?
267                             /* No committed offset, start from beginning */
268                             RD_KAFKA_OFFSET_BEGINNING :
269                             /* Use committed offset */
270                             offset->elems[0].offset,
271                             0);
272 
273         if (err)
274                 fatal("Failed to seek %s [%d]: %s",
275                       state->rktpar->topic, (int)state->rktpar->partition,
276                       rd_kafka_err2str(err));
277 
278         rd_kafka_topic_destroy(rkt);
279 }
280 
281 
282 /**
283  * @brief Commit the current transaction and start a new transaction.
284  */
commit_transaction_and_start_new(struct state * state)285 static void commit_transaction_and_start_new (struct state *state) {
286         rd_kafka_error_t *error;
287         rd_kafka_resp_err_t err;
288         rd_kafka_consumer_group_metadata_t *cgmd;
289         rd_kafka_topic_partition_list_t *offset;
290 
291         fprintf(stdout, "Committing transaction for %s [%d]\n",
292                 state->rktpar->topic, state->rktpar->partition);
293 
294         /* Send the input consumer's offset to transaction
295          * to commit those offsets along with the transaction itself,
296          * this is what guarantees exactly-once-semantics (EOS), that
297          * input (offsets) and output (messages) are committed atomically. */
298 
299         /* Get the consumer's current group state */
300         cgmd = rd_kafka_consumer_group_metadata(consumer);
301 
302         /* Get consumer's current position for this partition */
303         offset = rd_kafka_topic_partition_list_new(1);
304         rd_kafka_topic_partition_list_add(offset,
305                                           state->rktpar->topic,
306                                           state->rktpar->partition);
307         err = rd_kafka_position(consumer, offset);
308         if (err)
309                 fatal("Failed to get consumer position for %s [%d]: %s",
310                       state->rktpar->topic, state->rktpar->partition,
311                       rd_kafka_err2str(err));
312 
313         /* Send offsets to transaction coordinator */
314         error = rd_kafka_send_offsets_to_transaction(state->producer,
315                                                      offset, cgmd, -1);
316         rd_kafka_consumer_group_metadata_destroy(cgmd);
317         rd_kafka_topic_partition_list_destroy(offset);
318         if (error) {
319                 if (rd_kafka_error_txn_requires_abort(error)) {
320                         fprintf(stderr,
321                                 "WARNING: Failed to send offsets to "
322                                 "transaction: %s: %s: aborting transaction\n",
323                                 rd_kafka_error_name(error),
324                                 rd_kafka_error_string(error));
325                         rd_kafka_error_destroy(error);
326                         abort_transaction_and_rewind(state);
327                         return;
328                 } else {
329                         fatal_error("Failed to send offsets to transaction",
330                                     error);
331                 }
332         }
333 
334         /* Commit the transaction */
335         error = rd_kafka_commit_transaction(state->producer, -1);
336         if (error) {
337                 if (rd_kafka_error_txn_requires_abort(error)) {
338                         fprintf(stderr,
339                                 "WARNING: Failed to commit transaction: "
340                                 "%s: %s: aborting transaction\n",
341                                 rd_kafka_error_name(error),
342                                 rd_kafka_error_string(error));
343                         abort_transaction_and_rewind(state);
344                         rd_kafka_error_destroy(error);
345                         return;
346                 } else {
347                         fatal_error("Failed to commit transaction", error);
348                 }
349         }
350 
351         /* Begin new transaction */
352         error = rd_kafka_begin_transaction(state->producer);
353         if (error)
354                 fatal_error("Failed to begin new transaction", error);
355 }
356 
357 /**
358  * @brief The rebalance will be triggered (from rd_kafka_consumer_poll())
359  *        when the consumer's partition assignment is assigned or revoked.
360  *
361  * Prior to KIP-447 being supported there must be one transactional output
362  * producer for each consumed input partition, so we create and destroy
363  * these producer's from this callback.
364  */
365 static void
consumer_group_rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque)366 consumer_group_rebalance_cb (rd_kafka_t *rk,
367                              rd_kafka_resp_err_t err,
368                              rd_kafka_topic_partition_list_t *partitions,
369                              void *opaque) {
370         int i;
371 
372         if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE"))
373                 fatal("This example has not yet been modified to work with "
374                       "cooperative incremental rebalancing "
375                       "(partition.assignment.strategy=cooperative-sticky)");
376 
377         switch (err)
378         {
379         case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
380                 assigned_partitions =
381                         rd_kafka_topic_partition_list_copy(partitions);
382 
383                 fprintf(stdout, "Consumer group rebalanced, new assignment:\n");
384 
385                 /* Create a transactional producer for each input partition */
386                 for (i = 0 ; i < assigned_partitions->cnt ; i++) {
387                         /* Store the partition-to-producer mapping
388                          * in the partition's opaque field. */
389                         rd_kafka_topic_partition_t *rktpar =
390                                 &assigned_partitions->elems[i];
391                         struct state *state = calloc(1, sizeof(*state));
392 
393                         state->producer = create_transactional_producer(rktpar);
394                         state->rktpar = rktpar;
395                         rktpar->opaque = state;
396                         state->last_commit = time(NULL);
397 
398                         fprintf(stdout,
399                                 " %s [%d] with transactional producer %s\n",
400                                 rktpar->topic, rktpar->partition,
401                                 rd_kafka_name(state->producer));
402                 }
403 
404                 /* Let the consumer know the rebalance has been handled
405                  * by calling assign.
406                  * This will also tell the consumer to start fetching messages
407                  * for the assigned partitions. */
408                 rd_kafka_assign(rk, partitions);
409                 break;
410 
411         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
412                 fprintf(stdout,
413                         "Consumer group rebalanced, assignment revoked\n");
414 
415                 /* Abort the current transactions and destroy all producers */
416                 for (i = 0 ; i < assigned_partitions->cnt ; i++) {
417                         /* Store the partition-to-producer mapping
418                          * in the partition's opaque field. */
419                         struct state *state = (struct state *)
420                                 assigned_partitions->elems[i].opaque;
421 
422                         destroy_transactional_producer(state->producer);
423                         free(state);
424                 }
425 
426                 rd_kafka_topic_partition_list_destroy(assigned_partitions);
427                 assigned_partitions = NULL;
428 
429                 /* Let the consumer know the rebalance has been handled
430                  * and revoke the current assignment. */
431                 rd_kafka_assign(rk, NULL);
432                 break;
433 
434         default:
435                 /* NOTREACHED */
436                 fatal("Unexpected rebalance event: %s", rd_kafka_err2name(err));
437         }
438 }
439 
440 
441 /**
442  * @brief Create the input consumer.
443  */
create_input_consumer(const char * brokers,const char * input_topic)444 static rd_kafka_t *create_input_consumer (const char *brokers,
445                                           const char *input_topic) {
446         rd_kafka_conf_t *conf = rd_kafka_conf_new();
447         rd_kafka_t *rk;
448         char errstr[256];
449         rd_kafka_resp_err_t err;
450         rd_kafka_topic_partition_list_t *topics;
451 
452         if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
453                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
454             rd_kafka_conf_set(conf, "group.id",
455                               "librdkafka_transactions_older_example_group",
456                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
457             /* The input consumer's offsets are explicitly committed with the
458              * output producer's transaction using
459              * rd_kafka_send_offsets_to_transaction(), so auto commits
460              * must be disabled. */
461             rd_kafka_conf_set(conf, "enable.auto.commit", "false",
462                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
463                 fatal("Failed to configure consumer: %s", errstr);
464         }
465 
466         /* This callback will be called when the consumer group is rebalanced
467          * and the consumer's partition assignment is assigned or revoked. */
468         rd_kafka_conf_set_rebalance_cb(conf, consumer_group_rebalance_cb);
469 
470         /* Create consumer */
471         rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
472         if (!rk) {
473                 rd_kafka_conf_destroy(conf);
474                 fatal("Failed to create consumer: %s", errstr);
475         }
476 
477         /* Forward all partition messages to the main queue and
478          * rd_kafka_consumer_poll(). */
479         rd_kafka_poll_set_consumer(rk);
480 
481         /* Subscribe to the input topic */
482         topics = rd_kafka_topic_partition_list_new(1);
483         rd_kafka_topic_partition_list_add(topics, input_topic,
484                                           /* The partition is ignored in
485                                            * rd_kafka_subscribe() */
486                                           RD_KAFKA_PARTITION_UA);
487         err = rd_kafka_subscribe(rk, topics);
488         rd_kafka_topic_partition_list_destroy(topics);
489         if (err) {
490                 rd_kafka_destroy(rk);
491                 fatal("Failed to subscribe to %s: %s\n",
492                       input_topic, rd_kafka_err2str(err));
493         }
494 
495         return rk;
496 }
497 
498 
499 /**
500  * @brief Find and parse next integer string in \p start.
501  * @returns Pointer after found integer string, or NULL if not found.
502  */
find_next_int(const void * start,const void * end,int * intp)503 static const void *find_next_int (const void *start, const void *end,
504                                   int *intp) {
505         const char *p;
506         int collecting = 0;
507         int num = 0;
508 
509         for (p = (const char *)start ; p < (const char *)end ; p++) {
510                 if (isdigit((int)(*p))) {
511                         collecting = 1;
512                         num = (num * 10) + ((int)*p - ((int)'0'));
513                 } else if (collecting)
514                         break;
515         }
516 
517         if (!collecting)
518                 return NULL; /* No integer string found */
519 
520         *intp = num;
521 
522         return p;
523 }
524 
525 
526 /**
527  * @brief Process a message from the input consumer by parsing all
528  *        integer strings, adding them, and then producing the sum
529  *        the output topic using the transactional producer for the given
530  *        inut partition.
531  */
process_message(struct state * state,const rd_kafka_message_t * rkmessage)532 static void process_message (struct state *state,
533                              const rd_kafka_message_t *rkmessage) {
534         int num;
535         long unsigned sum = 0;
536         const void *p, *end;
537         rd_kafka_resp_err_t err;
538         char value[64];
539 
540         if (rkmessage->len == 0)
541                 return; /* Ignore empty messages */
542 
543         p = rkmessage->payload;
544         end = ((const char *)rkmessage->payload) + rkmessage->len;
545 
546         /* Find and sum all numbers in the message */
547         while ((p = find_next_int(p, end, &num)))
548                 sum += num;
549 
550         if (sum == 0)
551                 return; /* No integers in message, ignore it. */
552 
553         snprintf(value, sizeof(value), "%lu", sum);
554 
555         /* Emit output message on transactional producer */
556         while (1) {
557                 err = rd_kafka_producev(
558                         state->producer,
559                         RD_KAFKA_V_TOPIC(output_topic),
560                         /* Use same key as input message */
561                         RD_KAFKA_V_KEY(rkmessage->key,
562                                        rkmessage->key_len),
563                         /* Value is the current sum of this
564                          * transaction. */
565                         RD_KAFKA_V_VALUE(value, strlen(value)),
566                         /* Copy value since it is allocated on the stack */
567                         RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
568                         RD_KAFKA_V_END);
569 
570                 if (!err)
571                         break;
572                 else if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
573                         /* If output queue fills up we need to wait for
574                          * some delivery reports and then retry. */
575                         rd_kafka_poll(state->producer, 100);
576                         continue;
577                 } else {
578                         fprintf(stderr,
579                                 "WARNING: Failed to produce message to %s: "
580                                 "%s: aborting transaction\n",
581                                 output_topic, rd_kafka_err2str(err));
582                         abort_transaction_and_rewind(state);
583                         return;
584                 }
585         }
586 }
587 
588 
main(int argc,char ** argv)589 int main (int argc, char **argv) {
590         /*
591          * Argument validation
592          */
593         if (argc != 4) {
594                 fprintf(stderr,
595                         "%% Usage: %s <broker> <input-topic> <output-topic>\n",
596                         argv[0]);
597                 return 1;
598         }
599 
600         brokers = argv[1];
601         input_topic = argv[2];
602         output_topic = argv[3];
603 
604         /* Signal handler for clean shutdown */
605         signal(SIGINT, stop);
606 
607         consumer = create_input_consumer(brokers, input_topic);
608 
609         fprintf(stdout,
610                 "Expecting integers to sum on input topic %s ...\n"
611                 "To generate input messages you can use:\n"
612                 "  $ seq 1 100 | examples/producer %s %s\n",
613                 input_topic, brokers, input_topic);
614 
615         while (run) {
616                 rd_kafka_message_t *msg;
617                 struct state *state;
618                 rd_kafka_topic_partition_t *rktpar;
619 
620                 /* Wait for new mesages or error events */
621                 msg = rd_kafka_consumer_poll(consumer, 1000/*1 second*/);
622                 if (!msg)
623                         continue;
624 
625                 if (msg->err) {
626                         /* Client errors are typically just informational
627                          * since the client will automatically try to recover
628                          * from all types of errors.
629                          * It is thus sufficient for the application to log and
630                          * continue operating when an error is received. */
631                         fprintf(stderr, "WARNING: Consumer error: %s\n",
632                                 rd_kafka_message_errstr(msg));
633                         rd_kafka_message_destroy(msg);
634                         continue;
635                 }
636 
637                 /* Find output producer for this input partition */
638                 rktpar = rd_kafka_topic_partition_list_find(
639                         assigned_partitions,
640                         rd_kafka_topic_name(msg->rkt), msg->partition);
641                 if (!rktpar)
642                         fatal("BUG: No output producer for assigned "
643                               "partition %s [%d]",
644                               rd_kafka_topic_name(msg->rkt),
645                               (int)msg->partition);
646 
647                 /* Get state struct for this partition */
648                 state = (struct state *)rktpar->opaque;
649 
650                 /* Process message */
651                 process_message(state, msg);
652 
653                 rd_kafka_message_destroy(msg);
654 
655                 /* Commit transaction every 100 messages or 5 seconds */
656                 if (++state->msgcnt > 100 ||
657                     state->last_commit + 5 <= time(NULL)) {
658                         commit_transaction_and_start_new(state);
659                         state->msgcnt = 0;
660                         state->last_commit = time(NULL);
661                 }
662         }
663 
664         fprintf(stdout, "Closing consumer\n");
665         rd_kafka_consumer_close(consumer);
666 
667         rd_kafka_destroy(consumer);
668 
669         return 0;
670 }
671