1 /* 2 * librdkafka - Apache Kafka C library 3 * 4 * Copyright (c) 2020, Magnus Edenhill 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright notice, 11 * this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright notice, 13 * this list of conditions and the following disclaimer in the documentation 14 * and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 /** 30 * @name Transactions example for Apache Kafka 2.5.0 (KIP-447) and later. 31 * 32 * This example show-cases a simple transactional consume-process-produce 33 * application that reads messages from an input topic, extracts all 34 * numbers from the message's value string, adds them up, and sends 35 * the sum to the output topic as part of a transaction. 36 * The transaction is committed every 5 seconds or 100 messages, whichever 37 * comes first. As the transaction is committed a new transaction is started. 38 * 39 * This example makes use of incremental rebalancing (KIP-429) and the 40 * cooperative-sticky partition.assignment.strategy on the consumer, providing 41 * hitless rebalances. 42 */ 43 44 #include <stdio.h> 45 #include <signal.h> 46 #include <unistd.h> 47 #include <string.h> 48 #include <stdlib.h> 49 #include <time.h> 50 #include <ctype.h> 51 52 53 /* Typical include path would be <librdkafka/rdkafka.h>, but this program 54 * is builtin from within the librdkafka source tree and thus differs. */ 55 #include "rdkafka.h" 56 57 58 static volatile sig_atomic_t run = 1; 59 60 /** 61 * @brief A fatal error has occurred, immediately exit the application. 62 */ 63 #define fatal(...) do { \ 64 fprintf(stderr, "FATAL ERROR: "); \ 65 fprintf(stderr, __VA_ARGS__); \ 66 fprintf(stderr, "\n"); \ 67 exit(1); \ 68 } while (0) 69 70 /** 71 * @brief Same as fatal() but takes an rd_kafka_error_t object, prints its 72 * error message, destroys the object and then exits fatally. 73 */ 74 #define fatal_error(what,error) do { \ 75 fprintf(stderr, "FATAL ERROR: %s: %s: %s\n", \ 76 what, rd_kafka_error_name(error), \ 77 rd_kafka_error_string(error)); \ 78 rd_kafka_error_destroy(error); \ 79 exit(1); \ 80 } while (0) 81 82 /** 83 * @brief Signal termination of program 84 */ 85 static void stop (int sig) { 86 run = 0; 87 } 88 89 90 /** 91 * @brief Message delivery report callback. 92 * 93 * This callback is called exactly once per message, indicating if 94 * the message was succesfully delivered 95 * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently 96 * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR). 97 * 98 * The callback is triggered from rd_kafka_poll(), rd_kafka_flush(), 99 * rd_kafka_abort_transaction() and rd_kafka_commit_transaction() and 100 * executes on the application's thread. 101 * 102 * The current transactional will enter the abortable state if any 103 * message permanently fails delivery and the application must then 104 * call rd_kafka_abort_transaction(). But it does not need to be done from 105 * here, this state is checked by all the transactional APIs and it is better 106 * to perform this error checking when calling 107 * rd_kafka_send_offsets_to_transaction() and rd_kafka_commit_transaction(). 108 * In the case of transactional producing the delivery report callback is 109 * mostly useful for logging the produce failures. 110 */ 111 static void dr_msg_cb (rd_kafka_t *rk, 112 const rd_kafka_message_t *rkmessage, void *opaque) { 113 if (rkmessage->err) 114 fprintf(stderr, 115 "%% Message delivery failed: %s\n", 116 rd_kafka_err2str(rkmessage->err)); 117 118 /* The rkmessage is destroyed automatically by librdkafka */ 119 } 120 121 122 123 /** 124 * @brief Create a transactional producer. 125 */ 126 static rd_kafka_t * 127 create_transactional_producer (const char *brokers, const char *output_topic) { 128 rd_kafka_conf_t *conf = rd_kafka_conf_new(); 129 rd_kafka_t *rk; 130 char errstr[256]; 131 rd_kafka_error_t *error; 132 133 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, 134 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || 135 rd_kafka_conf_set(conf, "transactional.id", 136 "librdkafka_transactions_example", 137 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) 138 fatal("Failed to configure producer: %s", errstr); 139 140 /* This callback will be called once per message to indicate 141 * final delivery status. */ 142 rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); 143 144 /* Create producer */ 145 rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); 146 if (!rk) { 147 rd_kafka_conf_destroy(conf); 148 fatal("Failed to create producer: %s", errstr); 149 } 150 151 /* Initialize transactions, this is only performed once 152 * per transactional producer to acquire its producer id, et.al. */ 153 error = rd_kafka_init_transactions(rk, -1); 154 if (error) 155 fatal_error("init_transactions()", error); 156 157 return rk; 158 } 159 160 161 /** 162 * @brief Rewind consumer's consume position to the last committed offsets 163 * for the current assignment. 164 */ 165 static void rewind_consumer (rd_kafka_t *consumer) { 166 rd_kafka_topic_partition_list_t *offsets; 167 rd_kafka_resp_err_t err; 168 rd_kafka_error_t *error; 169 int i; 170 171 /* Get committed offsets for the current assignment, if there 172 * is a current assignment. */ 173 err = rd_kafka_assignment(consumer, &offsets); 174 if (err) { 175 fprintf(stderr, "No current assignment to rewind: %s\n", 176 rd_kafka_err2str(err)); 177 return; 178 } 179 180 if (offsets->cnt == 0) { 181 fprintf(stderr, "No current assignment to rewind\n"); 182 rd_kafka_topic_partition_list_destroy(offsets); 183 return; 184 } 185 186 /* Note: Timeout must be lower than max.poll.interval.ms */ 187 err = rd_kafka_committed(consumer, offsets, 10*1000); 188 if (err) 189 fatal("Failed to acquire committed offsets: %s", 190 rd_kafka_err2str(err)); 191 192 /* Seek to committed offset, or start of partition if no 193 * committed offset is available. */ 194 for (i = 0 ; i < offsets->cnt ; i++) { 195 /* No committed offset, start from beginning */ 196 if (offsets->elems[i].offset < 0) 197 offsets->elems[i].offset = 198 RD_KAFKA_OFFSET_BEGINNING; 199 } 200 201 /* Perform seek */ 202 error = rd_kafka_seek_partitions(consumer, offsets, -1); 203 if (error) 204 fatal_error("Failed to seek", error); 205 206 rd_kafka_topic_partition_list_destroy(offsets); 207 } 208 209 /** 210 * @brief Abort the current transaction and rewind consumer offsets to 211 * position where the transaction last started, i.e., the committed 212 * consumer offset, then begin a new transaction. 213 */ 214 static void abort_transaction_and_rewind (rd_kafka_t *consumer, 215 rd_kafka_t *producer) { 216 rd_kafka_error_t *error; 217 218 fprintf(stdout, "Aborting transaction and rewinding offsets\n"); 219 220 /* Abort the current transaction */ 221 error = rd_kafka_abort_transaction(producer, -1); 222 if (error) 223 fatal_error("Failed to abort transaction", error); 224 225 /* Rewind consumer */ 226 rewind_consumer(consumer); 227 228 /* Begin a new transaction */ 229 error = rd_kafka_begin_transaction(producer); 230 if (error) 231 fatal_error("Failed to begin transaction", error); 232 } 233 234 235 /** 236 * @brief Commit the current transaction. 237 * 238 * @returns 1 if transaction was successfully committed, or 0 239 * if the current transaction was aborted. 240 */ 241 static int commit_transaction (rd_kafka_t *consumer, 242 rd_kafka_t *producer) { 243 rd_kafka_error_t *error; 244 rd_kafka_resp_err_t err; 245 rd_kafka_consumer_group_metadata_t *cgmd; 246 rd_kafka_topic_partition_list_t *offsets; 247 248 fprintf(stdout, "Committing transaction\n"); 249 250 /* Send the input consumer's offset to transaction 251 * to commit those offsets along with the transaction itself, 252 * this is what guarantees exactly-once-semantics (EOS), that 253 * input (offsets) and output (messages) are committed atomically. */ 254 255 /* Get the consumer's current group metadata state */ 256 cgmd = rd_kafka_consumer_group_metadata(consumer); 257 258 /* Get consumer's current assignment */ 259 err = rd_kafka_assignment(consumer, &offsets); 260 if (err || offsets->cnt == 0) { 261 /* No partition offsets to commit because consumer 262 * (most likely) lost the assignment, abort transaction. */ 263 if (err) 264 fprintf(stderr, 265 "Failed to get consumer assignment to commit: " 266 "%s\n", rd_kafka_err2str(err)); 267 else 268 rd_kafka_topic_partition_list_destroy(offsets); 269 270 error = rd_kafka_abort_transaction(producer, -1); 271 if (error) 272 fatal_error("Failed to abort transaction", error); 273 274 return 0; 275 } 276 277 /* Get consumer's current position for this partition */ 278 err = rd_kafka_position(consumer, offsets); 279 if (err) 280 fatal("Failed to get consumer position: %s", 281 rd_kafka_err2str(err)); 282 283 /* Send offsets to transaction coordinator */ 284 error = rd_kafka_send_offsets_to_transaction(producer, 285 offsets, cgmd, -1); 286 rd_kafka_consumer_group_metadata_destroy(cgmd); 287 rd_kafka_topic_partition_list_destroy(offsets); 288 if (error) { 289 if (rd_kafka_error_txn_requires_abort(error)) { 290 fprintf(stderr, 291 "WARNING: Failed to send offsets to " 292 "transaction: %s: %s: aborting transaction\n", 293 rd_kafka_error_name(error), 294 rd_kafka_error_string(error)); 295 rd_kafka_error_destroy(error); 296 297 /* Abort transaction */ 298 error = rd_kafka_abort_transaction(producer, -1); 299 if (error) 300 fatal_error("Failed to abort transaction", 301 error); 302 return 0; 303 } else { 304 fatal_error("Failed to send offsets to transaction", 305 error); 306 } 307 } 308 309 /* Commit the transaction */ 310 error = rd_kafka_commit_transaction(producer, -1); 311 if (error) { 312 if (rd_kafka_error_txn_requires_abort(error)) { 313 fprintf(stderr, 314 "WARNING: Failed to commit transaction: " 315 "%s: %s: aborting transaction\n", 316 rd_kafka_error_name(error), 317 rd_kafka_error_string(error)); 318 rd_kafka_error_destroy(error); 319 320 /* Abort transaction */ 321 error = rd_kafka_abort_transaction(producer, -1); 322 if (error) 323 fatal_error("Failed to abort transaction", 324 error); 325 return 0; 326 } else { 327 fatal_error("Failed to commit transaction", error); 328 } 329 } 330 331 return 1; 332 } 333 334 /** 335 * @brief Commit the current transaction and start a new transaction. 336 */ 337 static void commit_transaction_and_start_new (rd_kafka_t *consumer, 338 rd_kafka_t *producer) { 339 rd_kafka_error_t *error; 340 341 /* Commit transaction. 342 * If commit failed the transaction is aborted and we need 343 * to rewind the consumer to the last committed offsets. */ 344 if (!commit_transaction(consumer, producer)) 345 rewind_consumer(consumer); 346 347 /* Begin new transaction */ 348 error = rd_kafka_begin_transaction(producer); 349 if (error) 350 fatal_error("Failed to begin new transaction", error); 351 } 352 353 /** 354 * @brief The rebalance will be triggered (from rd_kafka_consumer_poll()) 355 * when the consumer's partition assignment is assigned or revoked. 356 */ 357 static void 358 consumer_group_rebalance_cb (rd_kafka_t *consumer, 359 rd_kafka_resp_err_t err, 360 rd_kafka_topic_partition_list_t *partitions, 361 void *opaque) { 362 rd_kafka_t *producer = (rd_kafka_t *)opaque; 363 rd_kafka_error_t *error; 364 365 switch (err) 366 { 367 case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: 368 fprintf(stdout, 369 "Consumer group rebalanced: " 370 "%d new partition(s) assigned\n", 371 partitions->cnt); 372 373 /* Start fetching messages for the assigned partitions 374 * and add them to the consumer's local assignment. */ 375 error = rd_kafka_incremental_assign(consumer, partitions); 376 if (error) 377 fatal_error("Incremental assign failed", error); 378 break; 379 380 case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: 381 if (rd_kafka_assignment_lost(consumer)) { 382 fprintf(stdout, 383 "Consumer group rebalanced: assignment lost: " 384 "aborting current transaction\n"); 385 386 error = rd_kafka_abort_transaction(producer, -1); 387 if (error) 388 fatal_error("Failed to abort transaction", 389 error); 390 } else { 391 fprintf(stdout, 392 "Consumer group rebalanced: %d partition(s) " 393 "revoked: committing current transaction\n", 394 partitions->cnt); 395 396 commit_transaction(consumer, producer); 397 } 398 399 /* Begin new transaction */ 400 error = rd_kafka_begin_transaction(producer); 401 if (error) 402 fatal_error("Failed to begin transaction", error); 403 404 /* Stop fetching messages for the revoekd partitions 405 * and remove them from the consumer's local assignment. */ 406 error = rd_kafka_incremental_unassign(consumer, partitions); 407 if (error) 408 fatal_error("Incremental unassign failed", error); 409 break; 410 411 default: 412 /* NOTREACHED */ 413 fatal("Unexpected rebalance event: %s", rd_kafka_err2name(err)); 414 } 415 } 416 417 418 /** 419 * @brief Create the input consumer. 420 */ 421 static rd_kafka_t *create_input_consumer (const char *brokers, 422 const char *input_topic, 423 rd_kafka_t *producer) { 424 rd_kafka_conf_t *conf = rd_kafka_conf_new(); 425 rd_kafka_t *rk; 426 char errstr[256]; 427 rd_kafka_resp_err_t err; 428 rd_kafka_topic_partition_list_t *topics; 429 430 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, 431 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || 432 rd_kafka_conf_set(conf, "group.id", 433 "librdkafka_transactions_example_group", 434 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || 435 rd_kafka_conf_set(conf, "partition.assignment.strategy", 436 "cooperative-sticky", 437 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || 438 rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", 439 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || 440 /* The input consumer's offsets are explicitly committed with the 441 * output producer's transaction using 442 * rd_kafka_send_offsets_to_transaction(), so auto commits 443 * must be disabled. */ 444 rd_kafka_conf_set(conf, "enable.auto.commit", "false", 445 errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { 446 fatal("Failed to configure consumer: %s", errstr); 447 } 448 449 /* This callback will be called when the consumer group is rebalanced 450 * and the consumer's partition assignment is assigned or revoked. */ 451 rd_kafka_conf_set_rebalance_cb(conf, consumer_group_rebalance_cb); 452 453 /* The producer handle is needed in the consumer's rebalance callback 454 * to be able to abort and commit transactions, so we pass the 455 * producer as the consumer's opaque. */ 456 rd_kafka_conf_set_opaque(conf, producer); 457 458 /* Create consumer */ 459 rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); 460 if (!rk) { 461 rd_kafka_conf_destroy(conf); 462 fatal("Failed to create consumer: %s", errstr); 463 } 464 465 /* Forward all partition messages to the main queue and 466 * rd_kafka_consumer_poll(). */ 467 rd_kafka_poll_set_consumer(rk); 468 469 /* Subscribe to the input topic */ 470 topics = rd_kafka_topic_partition_list_new(1); 471 rd_kafka_topic_partition_list_add(topics, input_topic, 472 /* The partition is ignored in 473 * rd_kafka_subscribe() */ 474 RD_KAFKA_PARTITION_UA); 475 err = rd_kafka_subscribe(rk, topics); 476 rd_kafka_topic_partition_list_destroy(topics); 477 if (err) { 478 rd_kafka_destroy(rk); 479 fatal("Failed to subscribe to %s: %s\n", 480 input_topic, rd_kafka_err2str(err)); 481 } 482 483 return rk; 484 } 485 486 487 /** 488 * @brief Find and parse next integer string in \p start. 489 * @returns Pointer after found integer string, or NULL if not found. 490 */ 491 static const void *find_next_int (const void *start, const void *end, 492 int *intp) { 493 const char *p; 494 int collecting = 0; 495 int num = 0; 496 497 for (p = (const char *)start ; p < (const char *)end ; p++) { 498 if (isdigit((int)(*p))) { 499 collecting = 1; 500 num = (num * 10) + ((int)*p - ((int)'0')); 501 } else if (collecting) 502 break; 503 } 504 505 if (!collecting) 506 return NULL; /* No integer string found */ 507 508 *intp = num; 509 510 return p; 511 } 512 513 514 /** 515 * @brief Process a message from the input consumer by parsing all 516 * integer strings, adding them, and then producing the sum 517 * the output topic using the transactional producer for the given 518 * inut partition. 519 */ 520 static void process_message (rd_kafka_t *consumer, 521 rd_kafka_t *producer, 522 const char *output_topic, 523 const rd_kafka_message_t *rkmessage) { 524 int num; 525 long unsigned sum = 0; 526 const void *p, *end; 527 rd_kafka_resp_err_t err; 528 char value[64]; 529 530 if (rkmessage->len == 0) 531 return; /* Ignore empty messages */ 532 533 p = rkmessage->payload; 534 end = ((const char *)rkmessage->payload) + rkmessage->len; 535 536 /* Find and sum all numbers in the message */ 537 while ((p = find_next_int(p, end, &num))) 538 sum += num; 539 540 if (sum == 0) 541 return; /* No integers in message, ignore it. */ 542 543 snprintf(value, sizeof(value), "%lu", sum); 544 545 /* Emit output message on transactional producer */ 546 while (1) { 547 err = rd_kafka_producev( 548 producer, 549 RD_KAFKA_V_TOPIC(output_topic), 550 /* Use same key as input message */ 551 RD_KAFKA_V_KEY(rkmessage->key, 552 rkmessage->key_len), 553 /* Value is the current sum of this 554 * transaction. */ 555 RD_KAFKA_V_VALUE(value, strlen(value)), 556 /* Copy value since it is allocated on the stack */ 557 RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), 558 RD_KAFKA_V_END); 559 560 if (!err) 561 break; 562 else if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) { 563 /* If output queue fills up we need to wait for 564 * some delivery reports and then retry. */ 565 rd_kafka_poll(producer, 100); 566 continue; 567 } else { 568 fprintf(stderr, 569 "WARNING: Failed to produce message to %s: " 570 "%s: aborting transaction\n", 571 output_topic, rd_kafka_err2str(err)); 572 abort_transaction_and_rewind(consumer, producer); 573 return; 574 } 575 } 576 } 577 578 579 int main (int argc, char **argv) { 580 rd_kafka_t *producer, *consumer; 581 int msgcnt = 0; 582 time_t last_commit = 0; 583 const char *brokers, *input_topic, *output_topic; 584 rd_kafka_error_t *error; 585 586 /* 587 * Argument validation 588 */ 589 if (argc != 4) { 590 fprintf(stderr, 591 "%% Usage: %s <broker> <input-topic> <output-topic>\n", 592 argv[0]); 593 return 1; 594 } 595 596 brokers = argv[1]; 597 input_topic = argv[2]; 598 output_topic = argv[3]; 599 600 /* Signal handler for clean shutdown */ 601 signal(SIGINT, stop); 602 603 producer = create_transactional_producer(brokers, output_topic); 604 605 consumer = create_input_consumer(brokers, input_topic, producer); 606 607 fprintf(stdout, 608 "Expecting integers to sum on input topic %s ...\n" 609 "To generate input messages you can use:\n" 610 " $ seq 1 100 | examples/producer %s %s\n" 611 "Observe summed integers on output topic %s:\n" 612 " $ examples/consumer %s just-watching %s\n" 613 "\n", 614 input_topic, brokers, input_topic, 615 output_topic, brokers, output_topic); 616 617 /* Begin transaction and start waiting for messages */ 618 error = rd_kafka_begin_transaction(producer); 619 if (error) 620 fatal_error("Failed to begin transaction", error); 621 622 while (run) { 623 rd_kafka_message_t *msg; 624 625 /* Commit transaction every 100 messages or 5 seconds */ 626 if (msgcnt > 0 && 627 (msgcnt > 100 || last_commit + 5 <= time(NULL))) { 628 printf("msgcnt %d, elapsed %d\n", msgcnt, 629 (int)(time(NULL) - last_commit)); 630 commit_transaction_and_start_new(consumer, producer); 631 msgcnt = 0; 632 last_commit = time(NULL); 633 } 634 635 /* Wait for new mesages or error events */ 636 msg = rd_kafka_consumer_poll(consumer, 1000/*1 second*/); 637 if (!msg) 638 continue; /* Poll timeout */ 639 640 if (msg->err) { 641 /* Client errors are typically just informational 642 * since the client will automatically try to recover 643 * from all types of errors. 644 * It is thus sufficient for the application to log and 645 * continue operating when a consumer error is 646 * encountered. */ 647 fprintf(stderr, "WARNING: Consumer error: %s\n", 648 rd_kafka_message_errstr(msg)); 649 rd_kafka_message_destroy(msg); 650 continue; 651 } 652 653 /* Process message */ 654 process_message(consumer, producer, output_topic, msg); 655 656 rd_kafka_message_destroy(msg); 657 658 msgcnt++; 659 } 660 661 fprintf(stdout, "Closing consumer\n"); 662 rd_kafka_consumer_close(consumer); 663 rd_kafka_destroy(consumer); 664 665 fprintf(stdout, "Closing producer\n"); 666 rd_kafka_destroy(producer); 667 668 return 0; 669 } 670