1 /*
2 * librdkafka - Apache Kafka C library
3 *
4 * Copyright (c) 2020, Magnus Edenhill
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 /**
30 * @name Transactions example for Apache Kafka <= 2.4.0 (no KIP-447 support).
31 *
32 * This example show-cases a simple transactional consume-process-produce
33 * application that reads messages from an input topic, extracts all
34 * numbers from the message's value string, adds them up, and sends
35 * the sum to the output topic as part of a transaction.
36 * The transaction is committed every 5 seconds or 100 messages, whichever
37 * comes first. As the transaction is committed a new transaction is started.
38 *
39 * @remark This example does not yet support incremental rebalancing and thus
40 * not the cooperative-sticky partition.assignment.strategy.
41 */
42
43 #include <stdio.h>
44 #include <signal.h>
45 #include <unistd.h>
46 #include <string.h>
47 #include <stdlib.h>
48 #include <time.h>
49 #include <ctype.h>
50
51
52 /* Typical include path would be <librdkafka/rdkafka.h>, but this program
53 * is builtin from within the librdkafka source tree and thus differs. */
54 #include "rdkafka.h"
55
56
57 static volatile sig_atomic_t run = 1;
58
59 static rd_kafka_t *consumer;
60
61 /* From command-line arguments */
62 static const char *brokers, *input_topic, *output_topic;
63
64
65 /**
66 * @struct This is the per input partition state, constisting of
67 * a transactional producer and the in-memory state for the current transaction.
68 * This demo simply finds all numbers (ascii string numbers) in the message
69 * payload and adds them.
70 */
71 struct state {
72 rd_kafka_t *producer; /**< Per-input partition output producer */
73 rd_kafka_topic_partition_t *rktpar; /**< Back-pointer to the
74 * input partition. */
75 time_t last_commit; /**< Last transaction commit */
76 int msgcnt; /**< Number of messages processed in current txn */
77 };
78 /* Current assignment for the input consumer.
79 * The .opaque field of each partition points to an allocated 'struct state'.
80 */
81 static rd_kafka_topic_partition_list_t *assigned_partitions;
82
83
84
85 /**
86 * @brief A fatal error has occurred, immediately exit the application.
87 */
88 #define fatal(...) do { \
89 fprintf(stderr, "FATAL ERROR: "); \
90 fprintf(stderr, __VA_ARGS__); \
91 fprintf(stderr, "\n"); \
92 exit(1); \
93 } while (0)
94
95 /**
96 * @brief Same as fatal() but takes an rd_kafka_error_t object, prints its
97 * error message, destroys the object and then exits fatally.
98 */
99 #define fatal_error(what,error) do { \
100 fprintf(stderr, "FATAL ERROR: %s: %s: %s\n", \
101 what, rd_kafka_error_name(error), \
102 rd_kafka_error_string(error)); \
103 rd_kafka_error_destroy(error); \
104 exit(1); \
105 } while (0)
106
107 /**
108 * @brief Signal termination of program
109 */
stop(int sig)110 static void stop (int sig) {
111 run = 0;
112 }
113
114
115 /**
116 * @brief Message delivery report callback.
117 *
118 * This callback is called exactly once per message, indicating if
119 * the message was succesfully delivered
120 * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
121 * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
122 *
123 * The callback is triggered from rd_kafka_poll(), rd_kafka_flush(),
124 * rd_kafka_abort_transaction() and rd_kafka_commit_transaction() and
125 * executes on the application's thread.
126 *
127 * The current transactional will enter the abortable state if any
128 * message permanently fails delivery and the application must then
129 * call rd_kafka_abort_transaction(). But it does not need to be done from
130 * here, this state is checked by all the transactional APIs and it is better
131 * to perform this error checking when calling
132 * rd_kafka_send_offsets_to_transaction() and rd_kafka_commit_transaction().
133 * In the case of transactional producing the delivery report callback is
134 * mostly useful for logging the produce failures.
135 */
dr_msg_cb(rd_kafka_t * rk,const rd_kafka_message_t * rkmessage,void * opaque)136 static void dr_msg_cb (rd_kafka_t *rk,
137 const rd_kafka_message_t *rkmessage, void *opaque) {
138 if (rkmessage->err)
139 fprintf(stderr,
140 "%% Message delivery failed: %s\n",
141 rd_kafka_err2str(rkmessage->err));
142
143 /* The rkmessage is destroyed automatically by librdkafka */
144 }
145
146
147
148 /**
149 * @brief Create a transactional producer for the given input pratition
150 * and begin a new transaction.
151 */
152 static rd_kafka_t *
create_transactional_producer(const rd_kafka_topic_partition_t * rktpar)153 create_transactional_producer (const rd_kafka_topic_partition_t *rktpar) {
154 rd_kafka_conf_t *conf = rd_kafka_conf_new();
155 rd_kafka_t *rk;
156 char errstr[256];
157 rd_kafka_error_t *error;
158 char transactional_id[256];
159
160 snprintf(transactional_id, sizeof(transactional_id),
161 "librdkafka_transactions_older_example_%s-%d",
162 rktpar->topic, rktpar->partition);
163
164 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
165 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
166 rd_kafka_conf_set(conf, "transactional.id", transactional_id,
167 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
168 rd_kafka_conf_set(conf, "transaction.timeout.ms", "60000",
169 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
170 fatal("Failed to configure producer: %s", errstr);
171
172 /* This callback will be called once per message to indicate
173 * final delivery status. */
174 rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
175
176 /* Create producer */
177 rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
178 if (!rk) {
179 rd_kafka_conf_destroy(conf);
180 fatal("Failed to create producer: %s", errstr);
181 }
182
183 /* Initialize transactions, this is only performed once
184 * per transactional producer to acquire its producer id, et.al. */
185 error = rd_kafka_init_transactions(rk, -1);
186 if (error)
187 fatal_error("init_transactions()", error);
188
189
190 /* Begin a new transaction */
191 error = rd_kafka_begin_transaction(rk);
192 if (error)
193 fatal_error("begin_transaction()", error);
194
195 return rk;
196 }
197
198
199 /**
200 * @brief Abort the current transaction and destroy the producer.
201 */
destroy_transactional_producer(rd_kafka_t * rk)202 static void destroy_transactional_producer (rd_kafka_t *rk) {
203 rd_kafka_error_t *error;
204
205 fprintf(stdout, "%s: aborting transaction and terminating producer\n",
206 rd_kafka_name(rk));
207
208 /* Abort the current transaction, ignore any errors
209 * since we're terminating the producer anyway. */
210 error = rd_kafka_abort_transaction(rk, -1);
211 if (error) {
212 fprintf(stderr,
213 "WARNING: Ignoring abort_transaction() error since "
214 "producer is being destroyed: %s\n",
215 rd_kafka_error_string(error));
216 rd_kafka_error_destroy(error);
217 }
218
219 rd_kafka_destroy(rk);
220 }
221
222
223
224 /**
225 * @brief Abort the current transaction and rewind consumer offsets to
226 * position where the transaction last started, i.e., the committed
227 * consumer offset.
228 */
abort_transaction_and_rewind(struct state * state)229 static void abort_transaction_and_rewind (struct state *state) {
230 rd_kafka_topic_t *rkt = rd_kafka_topic_new(consumer,
231 state->rktpar->topic, NULL);
232 rd_kafka_topic_partition_list_t *offset;
233 rd_kafka_resp_err_t err;
234 rd_kafka_error_t *error;
235
236 fprintf(stdout,
237 "Aborting transaction and rewinding offset for %s [%d]\n",
238 state->rktpar->topic, state->rktpar->partition);
239
240 /* Abort the current transaction */
241 error = rd_kafka_abort_transaction(state->producer, -1);
242 if (error)
243 fatal_error("Failed to abort transaction", error);
244
245 /* Begin a new transaction */
246 error = rd_kafka_begin_transaction(state->producer);
247 if (error)
248 fatal_error("Failed to begin transaction", error);
249
250 /* Get committed offset for this partition */
251 offset = rd_kafka_topic_partition_list_new(1);
252 rd_kafka_topic_partition_list_add(offset,
253 state->rktpar->topic,
254 state->rktpar->partition);
255
256 /* Note: Timeout must be lower than max.poll.interval.ms */
257 err = rd_kafka_committed(consumer, offset, 10*1000);
258 if (err)
259 fatal("Failed to acquire committed offset for %s [%d]: %s",
260 state->rktpar->topic, (int)state->rktpar->partition,
261 rd_kafka_err2str(err));
262
263 /* Seek to committed offset, or start of partition if no
264 * no committed offset is available. */
265 err = rd_kafka_seek(rkt, state->rktpar->partition,
266 offset->elems[0].offset < 0 ?
267 /* No committed offset, start from beginning */
268 RD_KAFKA_OFFSET_BEGINNING :
269 /* Use committed offset */
270 offset->elems[0].offset,
271 0);
272
273 if (err)
274 fatal("Failed to seek %s [%d]: %s",
275 state->rktpar->topic, (int)state->rktpar->partition,
276 rd_kafka_err2str(err));
277
278 rd_kafka_topic_destroy(rkt);
279 }
280
281
282 /**
283 * @brief Commit the current transaction and start a new transaction.
284 */
commit_transaction_and_start_new(struct state * state)285 static void commit_transaction_and_start_new (struct state *state) {
286 rd_kafka_error_t *error;
287 rd_kafka_resp_err_t err;
288 rd_kafka_consumer_group_metadata_t *cgmd;
289 rd_kafka_topic_partition_list_t *offset;
290
291 fprintf(stdout, "Committing transaction for %s [%d]\n",
292 state->rktpar->topic, state->rktpar->partition);
293
294 /* Send the input consumer's offset to transaction
295 * to commit those offsets along with the transaction itself,
296 * this is what guarantees exactly-once-semantics (EOS), that
297 * input (offsets) and output (messages) are committed atomically. */
298
299 /* Get the consumer's current group state */
300 cgmd = rd_kafka_consumer_group_metadata(consumer);
301
302 /* Get consumer's current position for this partition */
303 offset = rd_kafka_topic_partition_list_new(1);
304 rd_kafka_topic_partition_list_add(offset,
305 state->rktpar->topic,
306 state->rktpar->partition);
307 err = rd_kafka_position(consumer, offset);
308 if (err)
309 fatal("Failed to get consumer position for %s [%d]: %s",
310 state->rktpar->topic, state->rktpar->partition,
311 rd_kafka_err2str(err));
312
313 /* Send offsets to transaction coordinator */
314 error = rd_kafka_send_offsets_to_transaction(state->producer,
315 offset, cgmd, -1);
316 rd_kafka_consumer_group_metadata_destroy(cgmd);
317 rd_kafka_topic_partition_list_destroy(offset);
318 if (error) {
319 if (rd_kafka_error_txn_requires_abort(error)) {
320 fprintf(stderr,
321 "WARNING: Failed to send offsets to "
322 "transaction: %s: %s: aborting transaction\n",
323 rd_kafka_error_name(error),
324 rd_kafka_error_string(error));
325 rd_kafka_error_destroy(error);
326 abort_transaction_and_rewind(state);
327 return;
328 } else {
329 fatal_error("Failed to send offsets to transaction",
330 error);
331 }
332 }
333
334 /* Commit the transaction */
335 error = rd_kafka_commit_transaction(state->producer, -1);
336 if (error) {
337 if (rd_kafka_error_txn_requires_abort(error)) {
338 fprintf(stderr,
339 "WARNING: Failed to commit transaction: "
340 "%s: %s: aborting transaction\n",
341 rd_kafka_error_name(error),
342 rd_kafka_error_string(error));
343 abort_transaction_and_rewind(state);
344 rd_kafka_error_destroy(error);
345 return;
346 } else {
347 fatal_error("Failed to commit transaction", error);
348 }
349 }
350
351 /* Begin new transaction */
352 error = rd_kafka_begin_transaction(state->producer);
353 if (error)
354 fatal_error("Failed to begin new transaction", error);
355 }
356
357 /**
358 * @brief The rebalance will be triggered (from rd_kafka_consumer_poll())
359 * when the consumer's partition assignment is assigned or revoked.
360 *
361 * Prior to KIP-447 being supported there must be one transactional output
362 * producer for each consumed input partition, so we create and destroy
363 * these producer's from this callback.
364 */
365 static void
consumer_group_rebalance_cb(rd_kafka_t * rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t * partitions,void * opaque)366 consumer_group_rebalance_cb (rd_kafka_t *rk,
367 rd_kafka_resp_err_t err,
368 rd_kafka_topic_partition_list_t *partitions,
369 void *opaque) {
370 int i;
371
372 if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE"))
373 fatal("This example has not yet been modified to work with "
374 "cooperative incremental rebalancing "
375 "(partition.assignment.strategy=cooperative-sticky)");
376
377 switch (err)
378 {
379 case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
380 assigned_partitions =
381 rd_kafka_topic_partition_list_copy(partitions);
382
383 fprintf(stdout, "Consumer group rebalanced, new assignment:\n");
384
385 /* Create a transactional producer for each input partition */
386 for (i = 0 ; i < assigned_partitions->cnt ; i++) {
387 /* Store the partition-to-producer mapping
388 * in the partition's opaque field. */
389 rd_kafka_topic_partition_t *rktpar =
390 &assigned_partitions->elems[i];
391 struct state *state = calloc(1, sizeof(*state));
392
393 state->producer = create_transactional_producer(rktpar);
394 state->rktpar = rktpar;
395 rktpar->opaque = state;
396 state->last_commit = time(NULL);
397
398 fprintf(stdout,
399 " %s [%d] with transactional producer %s\n",
400 rktpar->topic, rktpar->partition,
401 rd_kafka_name(state->producer));
402 }
403
404 /* Let the consumer know the rebalance has been handled
405 * by calling assign.
406 * This will also tell the consumer to start fetching messages
407 * for the assigned partitions. */
408 rd_kafka_assign(rk, partitions);
409 break;
410
411 case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
412 fprintf(stdout,
413 "Consumer group rebalanced, assignment revoked\n");
414
415 /* Abort the current transactions and destroy all producers */
416 for (i = 0 ; i < assigned_partitions->cnt ; i++) {
417 /* Store the partition-to-producer mapping
418 * in the partition's opaque field. */
419 struct state *state = (struct state *)
420 assigned_partitions->elems[i].opaque;
421
422 destroy_transactional_producer(state->producer);
423 free(state);
424 }
425
426 rd_kafka_topic_partition_list_destroy(assigned_partitions);
427 assigned_partitions = NULL;
428
429 /* Let the consumer know the rebalance has been handled
430 * and revoke the current assignment. */
431 rd_kafka_assign(rk, NULL);
432 break;
433
434 default:
435 /* NOTREACHED */
436 fatal("Unexpected rebalance event: %s", rd_kafka_err2name(err));
437 }
438 }
439
440
441 /**
442 * @brief Create the input consumer.
443 */
create_input_consumer(const char * brokers,const char * input_topic)444 static rd_kafka_t *create_input_consumer (const char *brokers,
445 const char *input_topic) {
446 rd_kafka_conf_t *conf = rd_kafka_conf_new();
447 rd_kafka_t *rk;
448 char errstr[256];
449 rd_kafka_resp_err_t err;
450 rd_kafka_topic_partition_list_t *topics;
451
452 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
453 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
454 rd_kafka_conf_set(conf, "group.id",
455 "librdkafka_transactions_older_example_group",
456 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
457 /* The input consumer's offsets are explicitly committed with the
458 * output producer's transaction using
459 * rd_kafka_send_offsets_to_transaction(), so auto commits
460 * must be disabled. */
461 rd_kafka_conf_set(conf, "enable.auto.commit", "false",
462 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
463 fatal("Failed to configure consumer: %s", errstr);
464 }
465
466 /* This callback will be called when the consumer group is rebalanced
467 * and the consumer's partition assignment is assigned or revoked. */
468 rd_kafka_conf_set_rebalance_cb(conf, consumer_group_rebalance_cb);
469
470 /* Create consumer */
471 rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
472 if (!rk) {
473 rd_kafka_conf_destroy(conf);
474 fatal("Failed to create consumer: %s", errstr);
475 }
476
477 /* Forward all partition messages to the main queue and
478 * rd_kafka_consumer_poll(). */
479 rd_kafka_poll_set_consumer(rk);
480
481 /* Subscribe to the input topic */
482 topics = rd_kafka_topic_partition_list_new(1);
483 rd_kafka_topic_partition_list_add(topics, input_topic,
484 /* The partition is ignored in
485 * rd_kafka_subscribe() */
486 RD_KAFKA_PARTITION_UA);
487 err = rd_kafka_subscribe(rk, topics);
488 rd_kafka_topic_partition_list_destroy(topics);
489 if (err) {
490 rd_kafka_destroy(rk);
491 fatal("Failed to subscribe to %s: %s\n",
492 input_topic, rd_kafka_err2str(err));
493 }
494
495 return rk;
496 }
497
498
499 /**
500 * @brief Find and parse next integer string in \p start.
501 * @returns Pointer after found integer string, or NULL if not found.
502 */
find_next_int(const void * start,const void * end,int * intp)503 static const void *find_next_int (const void *start, const void *end,
504 int *intp) {
505 const char *p;
506 int collecting = 0;
507 int num = 0;
508
509 for (p = (const char *)start ; p < (const char *)end ; p++) {
510 if (isdigit((int)(*p))) {
511 collecting = 1;
512 num = (num * 10) + ((int)*p - ((int)'0'));
513 } else if (collecting)
514 break;
515 }
516
517 if (!collecting)
518 return NULL; /* No integer string found */
519
520 *intp = num;
521
522 return p;
523 }
524
525
526 /**
527 * @brief Process a message from the input consumer by parsing all
528 * integer strings, adding them, and then producing the sum
529 * the output topic using the transactional producer for the given
530 * inut partition.
531 */
process_message(struct state * state,const rd_kafka_message_t * rkmessage)532 static void process_message (struct state *state,
533 const rd_kafka_message_t *rkmessage) {
534 int num;
535 long unsigned sum = 0;
536 const void *p, *end;
537 rd_kafka_resp_err_t err;
538 char value[64];
539
540 if (rkmessage->len == 0)
541 return; /* Ignore empty messages */
542
543 p = rkmessage->payload;
544 end = ((const char *)rkmessage->payload) + rkmessage->len;
545
546 /* Find and sum all numbers in the message */
547 while ((p = find_next_int(p, end, &num)))
548 sum += num;
549
550 if (sum == 0)
551 return; /* No integers in message, ignore it. */
552
553 snprintf(value, sizeof(value), "%lu", sum);
554
555 /* Emit output message on transactional producer */
556 while (1) {
557 err = rd_kafka_producev(
558 state->producer,
559 RD_KAFKA_V_TOPIC(output_topic),
560 /* Use same key as input message */
561 RD_KAFKA_V_KEY(rkmessage->key,
562 rkmessage->key_len),
563 /* Value is the current sum of this
564 * transaction. */
565 RD_KAFKA_V_VALUE(value, strlen(value)),
566 /* Copy value since it is allocated on the stack */
567 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
568 RD_KAFKA_V_END);
569
570 if (!err)
571 break;
572 else if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
573 /* If output queue fills up we need to wait for
574 * some delivery reports and then retry. */
575 rd_kafka_poll(state->producer, 100);
576 continue;
577 } else {
578 fprintf(stderr,
579 "WARNING: Failed to produce message to %s: "
580 "%s: aborting transaction\n",
581 output_topic, rd_kafka_err2str(err));
582 abort_transaction_and_rewind(state);
583 return;
584 }
585 }
586 }
587
588
main(int argc,char ** argv)589 int main (int argc, char **argv) {
590 /*
591 * Argument validation
592 */
593 if (argc != 4) {
594 fprintf(stderr,
595 "%% Usage: %s <broker> <input-topic> <output-topic>\n",
596 argv[0]);
597 return 1;
598 }
599
600 brokers = argv[1];
601 input_topic = argv[2];
602 output_topic = argv[3];
603
604 /* Signal handler for clean shutdown */
605 signal(SIGINT, stop);
606
607 consumer = create_input_consumer(brokers, input_topic);
608
609 fprintf(stdout,
610 "Expecting integers to sum on input topic %s ...\n"
611 "To generate input messages you can use:\n"
612 " $ seq 1 100 | examples/producer %s %s\n",
613 input_topic, brokers, input_topic);
614
615 while (run) {
616 rd_kafka_message_t *msg;
617 struct state *state;
618 rd_kafka_topic_partition_t *rktpar;
619
620 /* Wait for new mesages or error events */
621 msg = rd_kafka_consumer_poll(consumer, 1000/*1 second*/);
622 if (!msg)
623 continue;
624
625 if (msg->err) {
626 /* Client errors are typically just informational
627 * since the client will automatically try to recover
628 * from all types of errors.
629 * It is thus sufficient for the application to log and
630 * continue operating when an error is received. */
631 fprintf(stderr, "WARNING: Consumer error: %s\n",
632 rd_kafka_message_errstr(msg));
633 rd_kafka_message_destroy(msg);
634 continue;
635 }
636
637 /* Find output producer for this input partition */
638 rktpar = rd_kafka_topic_partition_list_find(
639 assigned_partitions,
640 rd_kafka_topic_name(msg->rkt), msg->partition);
641 if (!rktpar)
642 fatal("BUG: No output producer for assigned "
643 "partition %s [%d]",
644 rd_kafka_topic_name(msg->rkt),
645 (int)msg->partition);
646
647 /* Get state struct for this partition */
648 state = (struct state *)rktpar->opaque;
649
650 /* Process message */
651 process_message(state, msg);
652
653 rd_kafka_message_destroy(msg);
654
655 /* Commit transaction every 100 messages or 5 seconds */
656 if (++state->msgcnt > 100 ||
657 state->last_commit + 5 <= time(NULL)) {
658 commit_transaction_and_start_new(state);
659 state->msgcnt = 0;
660 state->last_commit = time(NULL);
661 }
662 }
663
664 fprintf(stdout, "Closing consumer\n");
665 rd_kafka_consumer_close(consumer);
666
667 rd_kafka_destroy(consumer);
668
669 return 0;
670 }
671