1package sarama 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "sort" 8 "sync" 9 "time" 10) 11 12// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed. 13var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed") 14 15// ConsumerGroup is responsible for dividing up processing of topics and partitions 16// over a collection of processes (the members of the consumer group). 17type ConsumerGroup interface { 18 // Consume joins a cluster of consumers for a given list of topics and 19 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler. 20 // 21 // The life-cycle of a session is represented by the following steps: 22 // 23 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers) 24 // and is assigned their "fair share" of partitions, aka 'claims'. 25 // 2. Before processing starts, the handler's Setup() hook is called to notify the user 26 // of the claims and allow any necessary preparation or alteration of state. 27 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called 28 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected 29 // from concurrent reads/writes. 30 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the 31 // parent context is cancelled or when a server-side rebalance cycle is initiated. 32 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called 33 // to allow the user to perform any final tasks before a rebalance. 34 // 6. Finally, marked offsets are committed one last time before claims are released. 35 // 36 // Please note, that once a rebalance is triggered, sessions must be completed within 37 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit 38 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout 39 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset 40 // commit failures. 41 // This method should be called inside an infinite loop, when a 42 // server-side rebalance happens, the consumer session will need to be 43 // recreated to get the new claims. 44 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error 45 46 // Errors returns a read channel of errors that occurred during the consumer life-cycle. 47 // By default, errors are logged and not returned over this channel. 48 // If you want to implement any custom error handling, set your config's 49 // Consumer.Return.Errors setting to true, and read from this channel. 50 Errors() <-chan error 51 52 // Close stops the ConsumerGroup and detaches any running sessions. It is required to call 53 // this function before the object passes out of scope, as it will otherwise leak memory. 54 Close() error 55} 56 57type consumerGroup struct { 58 client Client 59 60 config *Config 61 consumer Consumer 62 groupID string 63 memberID string 64 errors chan error 65 66 lock sync.Mutex 67 closed chan none 68 closeOnce sync.Once 69 70 userData []byte 71} 72 73// NewConsumerGroup creates a new consumer group the given broker addresses and configuration. 74func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) { 75 client, err := NewClient(addrs, config) 76 if err != nil { 77 return nil, err 78 } 79 80 c, err := newConsumerGroup(groupID, client) 81 if err != nil { 82 _ = client.Close() 83 } 84 return c, err 85} 86 87// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still 88// necessary to call Close() on the underlying client when shutting down this consumer. 89// PLEASE NOTE: consumer groups can only re-use but not share clients. 90func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) { 91 // For clients passed in by the client, ensure we don't 92 // call Close() on it. 93 cli := &nopCloserClient{client} 94 return newConsumerGroup(groupID, cli) 95} 96 97func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) { 98 config := client.Config() 99 if !config.Version.IsAtLeast(V0_10_2_0) { 100 return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0") 101 } 102 103 consumer, err := NewConsumerFromClient(client) 104 if err != nil { 105 return nil, err 106 } 107 108 return &consumerGroup{ 109 client: client, 110 consumer: consumer, 111 config: config, 112 groupID: groupID, 113 errors: make(chan error, config.ChannelBufferSize), 114 closed: make(chan none), 115 }, nil 116} 117 118// Errors implements ConsumerGroup. 119func (c *consumerGroup) Errors() <-chan error { return c.errors } 120 121// Close implements ConsumerGroup. 122func (c *consumerGroup) Close() (err error) { 123 c.closeOnce.Do(func() { 124 close(c.closed) 125 126 // leave group 127 if e := c.leave(); e != nil { 128 err = e 129 } 130 131 // drain errors 132 go func() { 133 close(c.errors) 134 }() 135 for e := range c.errors { 136 err = e 137 } 138 139 if e := c.client.Close(); e != nil { 140 err = e 141 } 142 }) 143 return 144} 145 146// Consume implements ConsumerGroup. 147func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error { 148 // Ensure group is not closed 149 select { 150 case <-c.closed: 151 return ErrClosedConsumerGroup 152 default: 153 } 154 155 c.lock.Lock() 156 defer c.lock.Unlock() 157 158 // Quick exit when no topics are provided 159 if len(topics) == 0 { 160 return fmt.Errorf("no topics provided") 161 } 162 163 // Refresh metadata for requested topics 164 if err := c.client.RefreshMetadata(topics...); err != nil { 165 return err 166 } 167 168 // Init session 169 sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max) 170 if err == ErrClosedClient { 171 return ErrClosedConsumerGroup 172 } else if err != nil { 173 return err 174 } 175 176 // loop check topic partition numbers changed 177 // will trigger rebalance when any topic partitions number had changed 178 // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine 179 go c.loopCheckPartitionNumbers(topics, sess) 180 181 // Wait for session exit signal 182 <-sess.ctx.Done() 183 184 // Gracefully release session claims 185 return sess.release(true) 186} 187 188func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) { 189 select { 190 case <-c.closed: 191 return nil, ErrClosedConsumerGroup 192 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff): 193 } 194 195 if refreshCoordinator { 196 err := c.client.RefreshCoordinator(c.groupID) 197 if err != nil { 198 return c.retryNewSession(ctx, topics, handler, retries, true) 199 } 200 } 201 202 return c.newSession(ctx, topics, handler, retries-1) 203} 204 205func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) { 206 coordinator, err := c.client.Coordinator(c.groupID) 207 if err != nil { 208 if retries <= 0 { 209 return nil, err 210 } 211 212 return c.retryNewSession(ctx, topics, handler, retries, true) 213 } 214 215 // Join consumer group 216 join, err := c.joinGroupRequest(coordinator, topics) 217 if err != nil { 218 _ = coordinator.Close() 219 return nil, err 220 } 221 switch join.Err { 222 case ErrNoError: 223 c.memberID = join.MemberId 224 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately 225 c.memberID = "" 226 return c.newSession(ctx, topics, handler, retries) 227 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh 228 if retries <= 0 { 229 return nil, join.Err 230 } 231 232 return c.retryNewSession(ctx, topics, handler, retries, true) 233 case ErrRebalanceInProgress: // retry after backoff 234 if retries <= 0 { 235 return nil, join.Err 236 } 237 238 return c.retryNewSession(ctx, topics, handler, retries, false) 239 default: 240 return nil, join.Err 241 } 242 243 // Prepare distribution plan if we joined as the leader 244 var plan BalanceStrategyPlan 245 if join.LeaderId == join.MemberId { 246 members, err := join.GetMembers() 247 if err != nil { 248 return nil, err 249 } 250 251 plan, err = c.balance(members) 252 if err != nil { 253 return nil, err 254 } 255 } 256 257 // Sync consumer group 258 groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId) 259 if err != nil { 260 _ = coordinator.Close() 261 return nil, err 262 } 263 switch groupRequest.Err { 264 case ErrNoError: 265 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately 266 c.memberID = "" 267 return c.newSession(ctx, topics, handler, retries) 268 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh 269 if retries <= 0 { 270 return nil, groupRequest.Err 271 } 272 273 return c.retryNewSession(ctx, topics, handler, retries, true) 274 case ErrRebalanceInProgress: // retry after backoff 275 if retries <= 0 { 276 return nil, groupRequest.Err 277 } 278 279 return c.retryNewSession(ctx, topics, handler, retries, false) 280 default: 281 return nil, groupRequest.Err 282 } 283 284 // Retrieve and sort claims 285 var claims map[string][]int32 286 if len(groupRequest.MemberAssignment) > 0 { 287 members, err := groupRequest.GetMemberAssignment() 288 if err != nil { 289 return nil, err 290 } 291 claims = members.Topics 292 c.userData = members.UserData 293 294 for _, partitions := range claims { 295 sort.Sort(int32Slice(partitions)) 296 } 297 } 298 299 return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler) 300} 301 302func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) { 303 req := &JoinGroupRequest{ 304 GroupId: c.groupID, 305 MemberId: c.memberID, 306 SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond), 307 ProtocolType: "consumer", 308 } 309 if c.config.Version.IsAtLeast(V0_10_1_0) { 310 req.Version = 1 311 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond) 312 } 313 314 // use static user-data if configured, otherwise use consumer-group userdata from the last sync 315 userData := c.config.Consumer.Group.Member.UserData 316 if len(userData) == 0 { 317 userData = c.userData 318 } 319 meta := &ConsumerGroupMemberMetadata{ 320 Topics: topics, 321 UserData: userData, 322 } 323 strategy := c.config.Consumer.Group.Rebalance.Strategy 324 if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { 325 return nil, err 326 } 327 328 return coordinator.JoinGroup(req) 329} 330 331func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) { 332 req := &SyncGroupRequest{ 333 GroupId: c.groupID, 334 MemberId: c.memberID, 335 GenerationId: generationID, 336 } 337 strategy := c.config.Consumer.Group.Rebalance.Strategy 338 for memberID, topics := range plan { 339 assignment := &ConsumerGroupMemberAssignment{Topics: topics} 340 userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID) 341 if err != nil { 342 return nil, err 343 } 344 assignment.UserData = userDataBytes 345 if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil { 346 return nil, err 347 } 348 } 349 return coordinator.SyncGroup(req) 350} 351 352func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) { 353 req := &HeartbeatRequest{ 354 GroupId: c.groupID, 355 MemberId: memberID, 356 GenerationId: generationID, 357 } 358 359 return coordinator.Heartbeat(req) 360} 361 362func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) { 363 topics := make(map[string][]int32) 364 for _, meta := range members { 365 for _, topic := range meta.Topics { 366 topics[topic] = nil 367 } 368 } 369 370 for topic := range topics { 371 partitions, err := c.client.Partitions(topic) 372 if err != nil { 373 return nil, err 374 } 375 topics[topic] = partitions 376 } 377 378 strategy := c.config.Consumer.Group.Rebalance.Strategy 379 return strategy.Plan(members, topics) 380} 381 382// Leaves the cluster, called by Close. 383func (c *consumerGroup) leave() error { 384 c.lock.Lock() 385 defer c.lock.Unlock() 386 if c.memberID == "" { 387 return nil 388 } 389 390 coordinator, err := c.client.Coordinator(c.groupID) 391 if err != nil { 392 return err 393 } 394 395 resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{ 396 GroupId: c.groupID, 397 MemberId: c.memberID, 398 }) 399 if err != nil { 400 _ = coordinator.Close() 401 return err 402 } 403 404 // Unset memberID 405 c.memberID = "" 406 407 // Check response 408 switch resp.Err { 409 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError: 410 return nil 411 default: 412 return resp.Err 413 } 414} 415 416func (c *consumerGroup) handleError(err error, topic string, partition int32) { 417 if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 { 418 err = &ConsumerError{ 419 Topic: topic, 420 Partition: partition, 421 Err: err, 422 } 423 } 424 425 if !c.config.Consumer.Return.Errors { 426 Logger.Println(err) 427 return 428 } 429 430 select { 431 case <-c.closed: 432 // consumer is closed 433 return 434 default: 435 } 436 437 select { 438 case c.errors <- err: 439 default: 440 // no error listener 441 } 442} 443 444func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) { 445 pause := time.NewTicker(c.config.Metadata.RefreshFrequency) 446 defer session.cancel() 447 defer pause.Stop() 448 var oldTopicToPartitionNum map[string]int 449 var err error 450 if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil { 451 return 452 } 453 for { 454 if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil { 455 return 456 } else { 457 for topic, num := range oldTopicToPartitionNum { 458 if newTopicToPartitionNum[topic] != num { 459 return // trigger the end of the session on exit 460 } 461 } 462 } 463 select { 464 case <-pause.C: 465 case <-session.ctx.Done(): 466 Logger.Printf("loop check partition number coroutine will exit, topics %s", topics) 467 // if session closed by other, should be exited 468 return 469 case <-c.closed: 470 return 471 } 472 } 473} 474 475func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) { 476 topicToPartitionNum := make(map[string]int, len(topics)) 477 for _, topic := range topics { 478 if partitionNum, err := c.client.Partitions(topic); err != nil { 479 Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err) 480 return nil, err 481 } else { 482 topicToPartitionNum[topic] = len(partitionNum) 483 } 484 } 485 return topicToPartitionNum, nil 486} 487 488// -------------------------------------------------------------------- 489 490// ConsumerGroupSession represents a consumer group member session. 491type ConsumerGroupSession interface { 492 // Claims returns information about the claimed partitions by topic. 493 Claims() map[string][]int32 494 495 // MemberID returns the cluster member ID. 496 MemberID() string 497 498 // GenerationID returns the current generation ID. 499 GenerationID() int32 500 501 // MarkOffset marks the provided offset, alongside a metadata string 502 // that represents the state of the partition consumer at that point in time. The 503 // metadata string can be used by another consumer to restore that state, so it 504 // can resume consumption. 505 // 506 // To follow upstream conventions, you are expected to mark the offset of the 507 // next message to read, not the last message read. Thus, when calling `MarkOffset` 508 // you should typically add one to the offset of the last consumed message. 509 // 510 // Note: calling MarkOffset does not necessarily commit the offset to the backend 511 // store immediately for efficiency reasons, and it may never be committed if 512 // your application crashes. This means that you may end up processing the same 513 // message twice, and your processing should ideally be idempotent. 514 MarkOffset(topic string, partition int32, offset int64, metadata string) 515 516 // Commit the offset to the backend 517 // 518 // Note: calling Commit performs a blocking synchronous operation. 519 Commit() 520 521 // ResetOffset resets to the provided offset, alongside a metadata string that 522 // represents the state of the partition consumer at that point in time. Reset 523 // acts as a counterpart to MarkOffset, the difference being that it allows to 524 // reset an offset to an earlier or smaller value, where MarkOffset only 525 // allows incrementing the offset. cf MarkOffset for more details. 526 ResetOffset(topic string, partition int32, offset int64, metadata string) 527 528 // MarkMessage marks a message as consumed. 529 MarkMessage(msg *ConsumerMessage, metadata string) 530 531 // Context returns the session context. 532 Context() context.Context 533} 534 535type consumerGroupSession struct { 536 parent *consumerGroup 537 memberID string 538 generationID int32 539 handler ConsumerGroupHandler 540 541 claims map[string][]int32 542 offsets *offsetManager 543 ctx context.Context 544 cancel func() 545 546 waitGroup sync.WaitGroup 547 releaseOnce sync.Once 548 hbDying, hbDead chan none 549} 550 551func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) { 552 // init offset manager 553 offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client) 554 if err != nil { 555 return nil, err 556 } 557 558 // init context 559 ctx, cancel := context.WithCancel(ctx) 560 561 // init session 562 sess := &consumerGroupSession{ 563 parent: parent, 564 memberID: memberID, 565 generationID: generationID, 566 handler: handler, 567 offsets: offsets, 568 claims: claims, 569 ctx: ctx, 570 cancel: cancel, 571 hbDying: make(chan none), 572 hbDead: make(chan none), 573 } 574 575 // start heartbeat loop 576 go sess.heartbeatLoop() 577 578 // create a POM for each claim 579 for topic, partitions := range claims { 580 for _, partition := range partitions { 581 pom, err := offsets.ManagePartition(topic, partition) 582 if err != nil { 583 _ = sess.release(false) 584 return nil, err 585 } 586 587 // handle POM errors 588 go func(topic string, partition int32) { 589 for err := range pom.Errors() { 590 sess.parent.handleError(err, topic, partition) 591 } 592 }(topic, partition) 593 } 594 } 595 596 // perform setup 597 if err := handler.Setup(sess); err != nil { 598 _ = sess.release(true) 599 return nil, err 600 } 601 602 // start consuming 603 for topic, partitions := range claims { 604 for _, partition := range partitions { 605 sess.waitGroup.Add(1) 606 607 go func(topic string, partition int32) { 608 defer sess.waitGroup.Done() 609 610 // cancel the as session as soon as the first 611 // goroutine exits 612 defer sess.cancel() 613 614 // consume a single topic/partition, blocking 615 sess.consume(topic, partition) 616 }(topic, partition) 617 } 618 } 619 return sess, nil 620} 621 622func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims } 623func (s *consumerGroupSession) MemberID() string { return s.memberID } 624func (s *consumerGroupSession) GenerationID() int32 { return s.generationID } 625 626func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) { 627 if pom := s.offsets.findPOM(topic, partition); pom != nil { 628 pom.MarkOffset(offset, metadata) 629 } 630} 631 632func (s *consumerGroupSession) Commit() { 633 s.offsets.Commit() 634} 635 636func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) { 637 if pom := s.offsets.findPOM(topic, partition); pom != nil { 638 pom.ResetOffset(offset, metadata) 639 } 640} 641 642func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) { 643 s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata) 644} 645 646func (s *consumerGroupSession) Context() context.Context { 647 return s.ctx 648} 649 650func (s *consumerGroupSession) consume(topic string, partition int32) { 651 // quick exit if rebalance is due 652 select { 653 case <-s.ctx.Done(): 654 return 655 case <-s.parent.closed: 656 return 657 default: 658 } 659 660 // get next offset 661 offset := s.parent.config.Consumer.Offsets.Initial 662 if pom := s.offsets.findPOM(topic, partition); pom != nil { 663 offset, _ = pom.NextOffset() 664 } 665 666 // create new claim 667 claim, err := newConsumerGroupClaim(s, topic, partition, offset) 668 if err != nil { 669 s.parent.handleError(err, topic, partition) 670 return 671 } 672 673 // handle errors 674 go func() { 675 for err := range claim.Errors() { 676 s.parent.handleError(err, topic, partition) 677 } 678 }() 679 680 // trigger close when session is done 681 go func() { 682 select { 683 case <-s.ctx.Done(): 684 case <-s.parent.closed: 685 } 686 claim.AsyncClose() 687 }() 688 689 // start processing 690 if err := s.handler.ConsumeClaim(s, claim); err != nil { 691 s.parent.handleError(err, topic, partition) 692 } 693 694 // ensure consumer is closed & drained 695 claim.AsyncClose() 696 for _, err := range claim.waitClosed() { 697 s.parent.handleError(err, topic, partition) 698 } 699} 700 701func (s *consumerGroupSession) release(withCleanup bool) (err error) { 702 // signal release, stop heartbeat 703 s.cancel() 704 705 // wait for consumers to exit 706 s.waitGroup.Wait() 707 708 // perform release 709 s.releaseOnce.Do(func() { 710 if withCleanup { 711 if e := s.handler.Cleanup(s); e != nil { 712 s.parent.handleError(e, "", -1) 713 err = e 714 } 715 } 716 717 if e := s.offsets.Close(); e != nil { 718 err = e 719 } 720 721 close(s.hbDying) 722 <-s.hbDead 723 }) 724 725 return 726} 727 728func (s *consumerGroupSession) heartbeatLoop() { 729 defer close(s.hbDead) 730 defer s.cancel() // trigger the end of the session on exit 731 732 pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval) 733 defer pause.Stop() 734 735 retryBackoff := time.NewTimer(s.parent.config.Metadata.Retry.Backoff) 736 defer retryBackoff.Stop() 737 738 retries := s.parent.config.Metadata.Retry.Max 739 for { 740 coordinator, err := s.parent.client.Coordinator(s.parent.groupID) 741 if err != nil { 742 if retries <= 0 { 743 s.parent.handleError(err, "", -1) 744 return 745 } 746 retryBackoff.Reset(s.parent.config.Metadata.Retry.Backoff) 747 select { 748 case <-s.hbDying: 749 return 750 case <-retryBackoff.C: 751 retries-- 752 } 753 continue 754 } 755 756 resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID) 757 if err != nil { 758 _ = coordinator.Close() 759 760 if retries <= 0 { 761 s.parent.handleError(err, "", -1) 762 return 763 } 764 765 retries-- 766 continue 767 } 768 769 switch resp.Err { 770 case ErrNoError: 771 retries = s.parent.config.Metadata.Retry.Max 772 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration: 773 return 774 default: 775 s.parent.handleError(resp.Err, "", -1) 776 return 777 } 778 779 select { 780 case <-pause.C: 781 case <-s.hbDying: 782 return 783 } 784 } 785} 786 787// -------------------------------------------------------------------- 788 789// ConsumerGroupHandler instances are used to handle individual topic/partition claims. 790// It also provides hooks for your consumer group session life-cycle and allow you to 791// trigger logic before or after the consume loop(s). 792// 793// PLEASE NOTE that handlers are likely be called from several goroutines concurrently, 794// ensure that all state is safely protected against race conditions. 795type ConsumerGroupHandler interface { 796 // Setup is run at the beginning of a new session, before ConsumeClaim. 797 Setup(ConsumerGroupSession) error 798 799 // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited 800 // but before the offsets are committed for the very last time. 801 Cleanup(ConsumerGroupSession) error 802 803 // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). 804 // Once the Messages() channel is closed, the Handler must finish its processing 805 // loop and exit. 806 ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error 807} 808 809// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group. 810type ConsumerGroupClaim interface { 811 // Topic returns the consumed topic name. 812 Topic() string 813 814 // Partition returns the consumed partition. 815 Partition() int32 816 817 // InitialOffset returns the initial offset that was used as a starting point for this claim. 818 InitialOffset() int64 819 820 // HighWaterMarkOffset returns the high water mark offset of the partition, 821 // i.e. the offset that will be used for the next message that will be produced. 822 // You can use this to determine how far behind the processing is. 823 HighWaterMarkOffset() int64 824 825 // Messages returns the read channel for the messages that are returned by 826 // the broker. The messages channel will be closed when a new rebalance cycle 827 // is due. You must finish processing and mark offsets within 828 // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually 829 // re-assigned to another group member. 830 Messages() <-chan *ConsumerMessage 831} 832 833type consumerGroupClaim struct { 834 topic string 835 partition int32 836 offset int64 837 PartitionConsumer 838} 839 840func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) { 841 pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset) 842 if err == ErrOffsetOutOfRange { 843 offset = sess.parent.config.Consumer.Offsets.Initial 844 pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset) 845 } 846 if err != nil { 847 return nil, err 848 } 849 850 go func() { 851 for err := range pcm.Errors() { 852 sess.parent.handleError(err, topic, partition) 853 } 854 }() 855 856 return &consumerGroupClaim{ 857 topic: topic, 858 partition: partition, 859 offset: offset, 860 PartitionConsumer: pcm, 861 }, nil 862} 863 864func (c *consumerGroupClaim) Topic() string { return c.topic } 865func (c *consumerGroupClaim) Partition() int32 { return c.partition } 866func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset } 867 868// Drains messages and errors, ensures the claim is fully closed. 869func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) { 870 go func() { 871 for range c.Messages() { 872 } 873 }() 874 875 for err := range c.Errors() { 876 errs = append(errs, err) 877 } 878 return 879} 880