1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2020, Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  *    this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  *    this list of conditions and the following disclaimer in the documentation
14  *    and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 /**
30  * @name Transactions example for Apache Kafka 2.5.0 (KIP-447) and later.
31  *
32  * This example show-cases a simple transactional consume-process-produce
33  * application that reads messages from an input topic, extracts all
34  * numbers from the message's value string, adds them up, and sends
35  * the sum to the output topic as part of a transaction.
36  * The transaction is committed every 5 seconds or 100 messages, whichever
37  * comes first. As the transaction is committed a new transaction is started.
38  *
39  * This example makes use of incremental rebalancing (KIP-429) and the
40  * cooperative-sticky partition.assignment.strategy on the consumer, providing
41  * hitless rebalances.
42  */
43 
44 #include <stdio.h>
45 #include <signal.h>
46 #include <unistd.h>
47 #include <string.h>
48 #include <stdlib.h>
49 #include <time.h>
50 #include <ctype.h>
51 
52 
53 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
54  * is builtin from within the librdkafka source tree and thus differs. */
55 #include "rdkafka.h"
56 
57 
58 static volatile sig_atomic_t run = 1;
59 
60 /**
61  * @brief A fatal error has occurred, immediately exit the application.
62  */
63 #define fatal(...) do {                                 \
64                 fprintf(stderr, "FATAL ERROR: ");       \
65                 fprintf(stderr, __VA_ARGS__);           \
66                 fprintf(stderr, "\n");                  \
67                 exit(1);                                \
68         } while (0)
69 
70 /**
71  * @brief Same as fatal() but takes an rd_kafka_error_t object, prints its
72  *        error message, destroys the object and then exits fatally.
73  */
74 #define fatal_error(what,error) do {                                    \
75                 fprintf(stderr, "FATAL ERROR: %s: %s: %s\n",             \
76                         what, rd_kafka_error_name(error),               \
77                         rd_kafka_error_string(error));                  \
78                 rd_kafka_error_destroy(error);                          \
79                 exit(1);                                                \
80         } while (0)
81 
82 /**
83  * @brief Signal termination of program
84  */
85 static void stop (int sig) {
86         run = 0;
87 }
88 
89 
90 /**
91  * @brief Message delivery report callback.
92  *
93  * This callback is called exactly once per message, indicating if
94  * the message was succesfully delivered
95  * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
96  * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
97  *
98  * The callback is triggered from rd_kafka_poll(), rd_kafka_flush(),
99  * rd_kafka_abort_transaction() and rd_kafka_commit_transaction() and
100  * executes on the application's thread.
101  *
102  * The current transactional will enter the abortable state if any
103  * message permanently fails delivery and the application must then
104  * call rd_kafka_abort_transaction(). But it does not need to be done from
105  * here, this state is checked by all the transactional APIs and it is better
106  * to perform this error checking when calling
107  * rd_kafka_send_offsets_to_transaction() and rd_kafka_commit_transaction().
108  * In the case of transactional producing the delivery report callback is
109  * mostly useful for logging the produce failures.
110  */
111 static void dr_msg_cb (rd_kafka_t *rk,
112                        const rd_kafka_message_t *rkmessage, void *opaque) {
113         if (rkmessage->err)
114                 fprintf(stderr,
115                         "%% Message delivery failed: %s\n",
116                         rd_kafka_err2str(rkmessage->err));
117 
118         /* The rkmessage is destroyed automatically by librdkafka */
119 }
120 
121 
122 
123 /**
124  * @brief Create a transactional producer.
125  */
126 static rd_kafka_t *
127 create_transactional_producer (const char *brokers, const char *output_topic) {
128         rd_kafka_conf_t *conf = rd_kafka_conf_new();
129         rd_kafka_t *rk;
130         char errstr[256];
131         rd_kafka_error_t *error;
132 
133         if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
134                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
135             rd_kafka_conf_set(conf, "transactional.id",
136                               "librdkafka_transactions_example",
137                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
138                 fatal("Failed to configure producer: %s", errstr);
139 
140         /* This callback will be called once per message to indicate
141          * final delivery status. */
142         rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
143 
144         /* Create producer */
145         rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
146         if (!rk) {
147                 rd_kafka_conf_destroy(conf);
148                 fatal("Failed to create producer: %s", errstr);
149         }
150 
151         /* Initialize transactions, this is only performed once
152          * per transactional producer to acquire its producer id, et.al. */
153         error = rd_kafka_init_transactions(rk, -1);
154         if (error)
155                 fatal_error("init_transactions()", error);
156 
157         return rk;
158 }
159 
160 
161 /**
162  * @brief Rewind consumer's consume position to the last committed offsets
163  *        for the current assignment.
164  */
165 static void rewind_consumer (rd_kafka_t *consumer) {
166         rd_kafka_topic_partition_list_t *offsets;
167         rd_kafka_resp_err_t err;
168         rd_kafka_error_t *error;
169         int i;
170 
171         /* Get committed offsets for the current assignment, if there
172          * is a current assignment. */
173         err = rd_kafka_assignment(consumer, &offsets);
174         if (err) {
175                 fprintf(stderr, "No current assignment to rewind: %s\n",
176                         rd_kafka_err2str(err));
177                 return;
178         }
179 
180         if (offsets->cnt == 0) {
181                 fprintf(stderr, "No current assignment to rewind\n");
182                 rd_kafka_topic_partition_list_destroy(offsets);
183                 return;
184         }
185 
186         /* Note: Timeout must be lower than max.poll.interval.ms */
187         err = rd_kafka_committed(consumer, offsets, 10*1000);
188         if (err)
189                 fatal("Failed to acquire committed offsets: %s",
190                       rd_kafka_err2str(err));
191 
192         /* Seek to committed offset, or start of partition if no
193          * committed offset is available. */
194         for (i = 0 ; i < offsets->cnt ; i++) {
195                 /* No committed offset, start from beginning */
196                 if (offsets->elems[i].offset < 0)
197                         offsets->elems[i].offset =
198                                 RD_KAFKA_OFFSET_BEGINNING;
199         }
200 
201         /* Perform seek */
202         error = rd_kafka_seek_partitions(consumer, offsets, -1);
203         if (error)
204                 fatal_error("Failed to seek", error);
205 
206         rd_kafka_topic_partition_list_destroy(offsets);
207 }
208 
209 /**
210  * @brief Abort the current transaction and rewind consumer offsets to
211  *        position where the transaction last started, i.e., the committed
212  *        consumer offset, then begin a new transaction.
213  */
214 static void abort_transaction_and_rewind (rd_kafka_t *consumer,
215                                           rd_kafka_t *producer) {
216         rd_kafka_error_t *error;
217 
218         fprintf(stdout, "Aborting transaction and rewinding offsets\n");
219 
220         /* Abort the current transaction */
221         error = rd_kafka_abort_transaction(producer, -1);
222         if (error)
223                 fatal_error("Failed to abort transaction", error);
224 
225         /* Rewind consumer */
226         rewind_consumer(consumer);
227 
228         /* Begin a new transaction */
229         error = rd_kafka_begin_transaction(producer);
230         if (error)
231                 fatal_error("Failed to begin transaction", error);
232 }
233 
234 
235 /**
236  * @brief Commit the current transaction.
237  *
238  * @returns 1 if transaction was successfully committed, or 0
239  *          if the current transaction was aborted.
240  */
241 static int commit_transaction (rd_kafka_t *consumer,
242                                rd_kafka_t *producer) {
243         rd_kafka_error_t *error;
244         rd_kafka_resp_err_t err;
245         rd_kafka_consumer_group_metadata_t *cgmd;
246         rd_kafka_topic_partition_list_t *offsets;
247 
248         fprintf(stdout, "Committing transaction\n");
249 
250         /* Send the input consumer's offset to transaction
251          * to commit those offsets along with the transaction itself,
252          * this is what guarantees exactly-once-semantics (EOS), that
253          * input (offsets) and output (messages) are committed atomically. */
254 
255         /* Get the consumer's current group metadata state */
256         cgmd = rd_kafka_consumer_group_metadata(consumer);
257 
258         /* Get consumer's current assignment */
259         err = rd_kafka_assignment(consumer, &offsets);
260         if (err || offsets->cnt == 0) {
261                 /* No partition offsets to commit because consumer
262                  * (most likely) lost the assignment, abort transaction. */
263                 if (err)
264                         fprintf(stderr,
265                                 "Failed to get consumer assignment to commit: "
266                                 "%s\n", rd_kafka_err2str(err));
267                 else
268                         rd_kafka_topic_partition_list_destroy(offsets);
269 
270                 error = rd_kafka_abort_transaction(producer, -1);
271                 if (error)
272                         fatal_error("Failed to abort transaction", error);
273 
274                 return 0;
275         }
276 
277         /* Get consumer's current position for this partition */
278         err = rd_kafka_position(consumer, offsets);
279         if (err)
280                 fatal("Failed to get consumer position: %s",
281                       rd_kafka_err2str(err));
282 
283         /* Send offsets to transaction coordinator */
284         error = rd_kafka_send_offsets_to_transaction(producer,
285                                                      offsets, cgmd, -1);
286         rd_kafka_consumer_group_metadata_destroy(cgmd);
287         rd_kafka_topic_partition_list_destroy(offsets);
288         if (error) {
289                 if (rd_kafka_error_txn_requires_abort(error)) {
290                         fprintf(stderr,
291                                 "WARNING: Failed to send offsets to "
292                                 "transaction: %s: %s: aborting transaction\n",
293                                 rd_kafka_error_name(error),
294                                 rd_kafka_error_string(error));
295                         rd_kafka_error_destroy(error);
296 
297                         /* Abort transaction */
298                         error = rd_kafka_abort_transaction(producer, -1);
299                         if (error)
300                                 fatal_error("Failed to abort transaction",
301                                             error);
302                         return 0;
303                 } else {
304                         fatal_error("Failed to send offsets to transaction",
305                                     error);
306                 }
307         }
308 
309         /* Commit the transaction */
310         error = rd_kafka_commit_transaction(producer, -1);
311         if (error) {
312                 if (rd_kafka_error_txn_requires_abort(error)) {
313                         fprintf(stderr,
314                                 "WARNING: Failed to commit transaction: "
315                                 "%s: %s: aborting transaction\n",
316                                 rd_kafka_error_name(error),
317                                 rd_kafka_error_string(error));
318                         rd_kafka_error_destroy(error);
319 
320                         /* Abort transaction */
321                         error = rd_kafka_abort_transaction(producer, -1);
322                         if (error)
323                                 fatal_error("Failed to abort transaction",
324                                             error);
325                         return 0;
326                 } else {
327                         fatal_error("Failed to commit transaction", error);
328                 }
329         }
330 
331         return 1;
332 }
333 
334 /**
335  * @brief Commit the current transaction and start a new transaction.
336  */
337 static void commit_transaction_and_start_new (rd_kafka_t *consumer,
338                                               rd_kafka_t *producer) {
339         rd_kafka_error_t *error;
340 
341         /* Commit transaction.
342          * If commit failed the transaction is aborted and we need
343          * to rewind the consumer to the last committed offsets. */
344         if (!commit_transaction(consumer, producer))
345                 rewind_consumer(consumer);
346 
347         /* Begin new transaction */
348         error = rd_kafka_begin_transaction(producer);
349         if (error)
350                 fatal_error("Failed to begin new transaction", error);
351 }
352 
353 /**
354  * @brief The rebalance will be triggered (from rd_kafka_consumer_poll())
355  *        when the consumer's partition assignment is assigned or revoked.
356  */
357 static void
358 consumer_group_rebalance_cb (rd_kafka_t *consumer,
359                              rd_kafka_resp_err_t err,
360                              rd_kafka_topic_partition_list_t *partitions,
361                              void *opaque) {
362         rd_kafka_t *producer = (rd_kafka_t *)opaque;
363         rd_kafka_error_t *error;
364 
365         switch (err)
366         {
367         case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
368                 fprintf(stdout,
369                         "Consumer group rebalanced: "
370                         "%d new partition(s) assigned\n",
371                         partitions->cnt);
372 
373                 /* Start fetching messages for the assigned partitions
374                  * and add them to the consumer's local assignment. */
375                 error = rd_kafka_incremental_assign(consumer, partitions);
376                 if (error)
377                         fatal_error("Incremental assign failed", error);
378                 break;
379 
380         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
381                 if (rd_kafka_assignment_lost(consumer)) {
382                         fprintf(stdout,
383                                 "Consumer group rebalanced: assignment lost: "
384                                 "aborting current transaction\n");
385 
386                         error = rd_kafka_abort_transaction(producer, -1);
387                         if (error)
388                                 fatal_error("Failed to abort transaction",
389                                             error);
390                 } else {
391                         fprintf(stdout,
392                                 "Consumer group rebalanced: %d partition(s) "
393                                 "revoked: committing current transaction\n",
394                                 partitions->cnt);
395 
396                         commit_transaction(consumer, producer);
397                 }
398 
399                 /* Begin new transaction */
400                 error = rd_kafka_begin_transaction(producer);
401                 if (error)
402                         fatal_error("Failed to begin transaction", error);
403 
404                 /* Stop fetching messages for the revoekd partitions
405                  * and remove them from the consumer's local assignment. */
406                 error = rd_kafka_incremental_unassign(consumer, partitions);
407                 if (error)
408                         fatal_error("Incremental unassign failed", error);
409                 break;
410 
411         default:
412                 /* NOTREACHED */
413                 fatal("Unexpected rebalance event: %s", rd_kafka_err2name(err));
414         }
415 }
416 
417 
418 /**
419  * @brief Create the input consumer.
420  */
421 static rd_kafka_t *create_input_consumer (const char *brokers,
422                                           const char *input_topic,
423                                           rd_kafka_t *producer) {
424         rd_kafka_conf_t *conf = rd_kafka_conf_new();
425         rd_kafka_t *rk;
426         char errstr[256];
427         rd_kafka_resp_err_t err;
428         rd_kafka_topic_partition_list_t *topics;
429 
430         if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
431                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
432             rd_kafka_conf_set(conf, "group.id",
433                               "librdkafka_transactions_example_group",
434                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
435             rd_kafka_conf_set(conf, "partition.assignment.strategy",
436                               "cooperative-sticky",
437                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
438             rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
439                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
440             /* The input consumer's offsets are explicitly committed with the
441              * output producer's transaction using
442              * rd_kafka_send_offsets_to_transaction(), so auto commits
443              * must be disabled. */
444             rd_kafka_conf_set(conf, "enable.auto.commit", "false",
445                               errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
446                 fatal("Failed to configure consumer: %s", errstr);
447         }
448 
449         /* This callback will be called when the consumer group is rebalanced
450          * and the consumer's partition assignment is assigned or revoked. */
451         rd_kafka_conf_set_rebalance_cb(conf, consumer_group_rebalance_cb);
452 
453         /* The producer handle is needed in the consumer's rebalance callback
454          * to be able to abort and commit transactions, so we pass the
455          * producer as the consumer's opaque. */
456         rd_kafka_conf_set_opaque(conf, producer);
457 
458         /* Create consumer */
459         rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
460         if (!rk) {
461                 rd_kafka_conf_destroy(conf);
462                 fatal("Failed to create consumer: %s", errstr);
463         }
464 
465         /* Forward all partition messages to the main queue and
466          * rd_kafka_consumer_poll(). */
467         rd_kafka_poll_set_consumer(rk);
468 
469         /* Subscribe to the input topic */
470         topics = rd_kafka_topic_partition_list_new(1);
471         rd_kafka_topic_partition_list_add(topics, input_topic,
472                                           /* The partition is ignored in
473                                            * rd_kafka_subscribe() */
474                                           RD_KAFKA_PARTITION_UA);
475         err = rd_kafka_subscribe(rk, topics);
476         rd_kafka_topic_partition_list_destroy(topics);
477         if (err) {
478                 rd_kafka_destroy(rk);
479                 fatal("Failed to subscribe to %s: %s\n",
480                       input_topic, rd_kafka_err2str(err));
481         }
482 
483         return rk;
484 }
485 
486 
487 /**
488  * @brief Find and parse next integer string in \p start.
489  * @returns Pointer after found integer string, or NULL if not found.
490  */
491 static const void *find_next_int (const void *start, const void *end,
492                                   int *intp) {
493         const char *p;
494         int collecting = 0;
495         int num = 0;
496 
497         for (p = (const char *)start ; p < (const char *)end ; p++) {
498                 if (isdigit((int)(*p))) {
499                         collecting = 1;
500                         num = (num * 10) + ((int)*p - ((int)'0'));
501                 } else if (collecting)
502                         break;
503         }
504 
505         if (!collecting)
506                 return NULL; /* No integer string found */
507 
508         *intp = num;
509 
510         return p;
511 }
512 
513 
514 /**
515  * @brief Process a message from the input consumer by parsing all
516  *        integer strings, adding them, and then producing the sum
517  *        the output topic using the transactional producer for the given
518  *        inut partition.
519  */
520 static void process_message (rd_kafka_t *consumer,
521                              rd_kafka_t *producer,
522                              const char *output_topic,
523                              const rd_kafka_message_t *rkmessage) {
524         int num;
525         long unsigned sum = 0;
526         const void *p, *end;
527         rd_kafka_resp_err_t err;
528         char value[64];
529 
530         if (rkmessage->len == 0)
531                 return; /* Ignore empty messages */
532 
533         p = rkmessage->payload;
534         end = ((const char *)rkmessage->payload) + rkmessage->len;
535 
536         /* Find and sum all numbers in the message */
537         while ((p = find_next_int(p, end, &num)))
538                 sum += num;
539 
540         if (sum == 0)
541                 return; /* No integers in message, ignore it. */
542 
543         snprintf(value, sizeof(value), "%lu", sum);
544 
545         /* Emit output message on transactional producer */
546         while (1) {
547                 err = rd_kafka_producev(
548                         producer,
549                         RD_KAFKA_V_TOPIC(output_topic),
550                         /* Use same key as input message */
551                         RD_KAFKA_V_KEY(rkmessage->key,
552                                        rkmessage->key_len),
553                         /* Value is the current sum of this
554                          * transaction. */
555                         RD_KAFKA_V_VALUE(value, strlen(value)),
556                         /* Copy value since it is allocated on the stack */
557                         RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
558                         RD_KAFKA_V_END);
559 
560                 if (!err)
561                         break;
562                 else if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
563                         /* If output queue fills up we need to wait for
564                          * some delivery reports and then retry. */
565                         rd_kafka_poll(producer, 100);
566                         continue;
567                 } else {
568                         fprintf(stderr,
569                                 "WARNING: Failed to produce message to %s: "
570                                 "%s: aborting transaction\n",
571                                 output_topic, rd_kafka_err2str(err));
572                         abort_transaction_and_rewind(consumer, producer);
573                         return;
574                 }
575         }
576 }
577 
578 
579 int main (int argc, char **argv) {
580         rd_kafka_t *producer, *consumer;
581         int msgcnt = 0;
582         time_t last_commit = 0;
583         const char *brokers, *input_topic, *output_topic;
584         rd_kafka_error_t *error;
585 
586         /*
587          * Argument validation
588          */
589         if (argc != 4) {
590                 fprintf(stderr,
591                         "%% Usage: %s <broker> <input-topic> <output-topic>\n",
592                         argv[0]);
593                 return 1;
594         }
595 
596         brokers = argv[1];
597         input_topic = argv[2];
598         output_topic = argv[3];
599 
600         /* Signal handler for clean shutdown */
601         signal(SIGINT, stop);
602 
603         producer = create_transactional_producer(brokers, output_topic);
604 
605         consumer = create_input_consumer(brokers, input_topic, producer);
606 
607         fprintf(stdout,
608                 "Expecting integers to sum on input topic %s ...\n"
609                 "To generate input messages you can use:\n"
610                 "  $ seq 1 100 | examples/producer %s %s\n"
611                 "Observe summed integers on output topic %s:\n"
612                 "  $ examples/consumer %s just-watching %s\n"
613                 "\n",
614                 input_topic, brokers, input_topic,
615                 output_topic, brokers, output_topic);
616 
617         /* Begin transaction and start waiting for messages */
618         error = rd_kafka_begin_transaction(producer);
619         if (error)
620                 fatal_error("Failed to begin transaction", error);
621 
622         while (run) {
623                 rd_kafka_message_t *msg;
624 
625                 /* Commit transaction every 100 messages or 5 seconds */
626                 if (msgcnt > 0 &&
627                     (msgcnt > 100 || last_commit + 5 <= time(NULL))) {
628                         printf("msgcnt %d, elapsed %d\n", msgcnt,
629                                (int)(time(NULL) - last_commit));
630                         commit_transaction_and_start_new(consumer, producer);
631                         msgcnt = 0;
632                         last_commit = time(NULL);
633                 }
634 
635                 /* Wait for new mesages or error events */
636                 msg = rd_kafka_consumer_poll(consumer, 1000/*1 second*/);
637                 if (!msg)
638                         continue; /* Poll timeout */
639 
640                 if (msg->err) {
641                         /* Client errors are typically just informational
642                          * since the client will automatically try to recover
643                          * from all types of errors.
644                          * It is thus sufficient for the application to log and
645                          * continue operating when a consumer error is
646                          * encountered. */
647                         fprintf(stderr, "WARNING: Consumer error: %s\n",
648                                 rd_kafka_message_errstr(msg));
649                         rd_kafka_message_destroy(msg);
650                         continue;
651                 }
652 
653                 /* Process message */
654                 process_message(consumer, producer, output_topic, msg);
655 
656                 rd_kafka_message_destroy(msg);
657 
658                 msgcnt++;
659         }
660 
661         fprintf(stdout, "Closing consumer\n");
662         rd_kafka_consumer_close(consumer);
663         rd_kafka_destroy(consumer);
664 
665         fprintf(stdout, "Closing producer\n");
666         rd_kafka_destroy(producer);
667 
668         return 0;
669 }
670