1package sarama 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "sort" 8 "sync" 9 "time" 10) 11 12// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed. 13var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed") 14 15// ConsumerGroup is responsible for dividing up processing of topics and partitions 16// over a collection of processes (the members of the consumer group). 17type ConsumerGroup interface { 18 // Consume joins a cluster of consumers for a given list of topics and 19 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler. 20 // 21 // The life-cycle of a session is represented by the following steps: 22 // 23 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers) 24 // and is assigned their "fair share" of partitions, aka 'claims'. 25 // 2. Before processing starts, the handler's Setup() hook is called to notify the user 26 // of the claims and allow any necessary preparation or alteration of state. 27 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called 28 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected 29 // from concurrent reads/writes. 30 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the 31 // parent context is cancelled or when a server-side rebalance cycle is initiated. 32 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called 33 // to allow the user to perform any final tasks before a rebalance. 34 // 6. Finally, marked offsets are committed one last time before claims are released. 35 // 36 // Please note, that once a relance is triggered, sessions must be completed within 37 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit 38 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout 39 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset 40 // commit failures. 41 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error 42 43 // Errors returns a read channel of errors that occurred during the consumer life-cycle. 44 // By default, errors are logged and not returned over this channel. 45 // If you want to implement any custom error handling, set your config's 46 // Consumer.Return.Errors setting to true, and read from this channel. 47 Errors() <-chan error 48 49 // Close stops the ConsumerGroup and detaches any running sessions. It is required to call 50 // this function before the object passes out of scope, as it will otherwise leak memory. 51 Close() error 52} 53 54type consumerGroup struct { 55 client Client 56 ownClient bool 57 58 config *Config 59 consumer Consumer 60 groupID string 61 memberID string 62 errors chan error 63 64 lock sync.Mutex 65 closed chan none 66 closeOnce sync.Once 67} 68 69// NewConsumerGroup creates a new consumer group the given broker addresses and configuration. 70func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) { 71 client, err := NewClient(addrs, config) 72 if err != nil { 73 return nil, err 74 } 75 76 c, err := NewConsumerGroupFromClient(groupID, client) 77 if err != nil { 78 _ = client.Close() 79 return nil, err 80 } 81 82 c.(*consumerGroup).ownClient = true 83 return c, nil 84} 85 86// NewConsumerFromClient creates a new consumer group using the given client. It is still 87// necessary to call Close() on the underlying client when shutting down this consumer. 88// PLEASE NOTE: consumer groups can only re-use but not share clients. 89func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) { 90 config := client.Config() 91 if !config.Version.IsAtLeast(V0_10_2_0) { 92 return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0") 93 } 94 95 consumer, err := NewConsumerFromClient(client) 96 if err != nil { 97 return nil, err 98 } 99 100 return &consumerGroup{ 101 client: client, 102 consumer: consumer, 103 config: config, 104 groupID: groupID, 105 errors: make(chan error, config.ChannelBufferSize), 106 closed: make(chan none), 107 }, nil 108} 109 110// Errors implements ConsumerGroup. 111func (c *consumerGroup) Errors() <-chan error { return c.errors } 112 113// Close implements ConsumerGroup. 114func (c *consumerGroup) Close() (err error) { 115 c.closeOnce.Do(func() { 116 close(c.closed) 117 118 c.lock.Lock() 119 defer c.lock.Unlock() 120 121 // leave group 122 if e := c.leave(); e != nil { 123 err = e 124 } 125 126 // drain errors 127 go func() { 128 close(c.errors) 129 }() 130 for e := range c.errors { 131 err = e 132 } 133 134 if c.ownClient { 135 if e := c.client.Close(); e != nil { 136 err = e 137 } 138 } 139 }) 140 return 141} 142 143// Consume implements ConsumerGroup. 144func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error { 145 // Ensure group is not closed 146 select { 147 case <-c.closed: 148 return ErrClosedConsumerGroup 149 default: 150 } 151 152 c.lock.Lock() 153 defer c.lock.Unlock() 154 155 // Quick exit when no topics are provided 156 if len(topics) == 0 { 157 return fmt.Errorf("no topics provided") 158 } 159 160 // Refresh metadata for requested topics 161 if err := c.client.RefreshMetadata(topics...); err != nil { 162 return err 163 } 164 165 // Get coordinator 166 coordinator, err := c.client.Coordinator(c.groupID) 167 if err != nil { 168 return err 169 } 170 171 // Init session 172 sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max) 173 if err == ErrClosedClient { 174 return ErrClosedConsumerGroup 175 } else if err != nil { 176 return err 177 } 178 179 // Wait for session exit signal 180 <-sess.ctx.Done() 181 182 // Gracefully release session claims 183 return sess.release(true) 184} 185 186func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) { 187 // Join consumer group 188 join, err := c.joinGroupRequest(coordinator, topics) 189 if err != nil { 190 _ = coordinator.Close() 191 return nil, err 192 } 193 switch join.Err { 194 case ErrNoError: 195 c.memberID = join.MemberId 196 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately 197 c.memberID = "" 198 return c.newSession(ctx, coordinator, topics, handler, retries) 199 case ErrRebalanceInProgress: // retry after backoff 200 if retries <= 0 { 201 return nil, join.Err 202 } 203 204 select { 205 case <-c.closed: 206 return nil, ErrClosedConsumerGroup 207 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff): 208 } 209 210 return c.newSession(ctx, coordinator, topics, handler, retries-1) 211 default: 212 return nil, join.Err 213 } 214 215 // Prepare distribution plan if we joined as the leader 216 var plan BalanceStrategyPlan 217 if join.LeaderId == join.MemberId { 218 members, err := join.GetMembers() 219 if err != nil { 220 return nil, err 221 } 222 223 plan, err = c.balance(members) 224 if err != nil { 225 return nil, err 226 } 227 } 228 229 // Sync consumer group 230 sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId) 231 if err != nil { 232 _ = coordinator.Close() 233 return nil, err 234 } 235 switch sync.Err { 236 case ErrNoError: 237 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately 238 c.memberID = "" 239 return c.newSession(ctx, coordinator, topics, handler, retries) 240 case ErrRebalanceInProgress: // retry after backoff 241 if retries <= 0 { 242 return nil, sync.Err 243 } 244 245 select { 246 case <-c.closed: 247 return nil, ErrClosedConsumerGroup 248 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff): 249 } 250 251 return c.newSession(ctx, coordinator, topics, handler, retries-1) 252 default: 253 return nil, sync.Err 254 } 255 256 // Retrieve and sort claims 257 var claims map[string][]int32 258 if len(sync.MemberAssignment) > 0 { 259 members, err := sync.GetMemberAssignment() 260 if err != nil { 261 return nil, err 262 } 263 claims = members.Topics 264 265 for _, partitions := range claims { 266 sort.Sort(int32Slice(partitions)) 267 } 268 } 269 270 return newConsumerGroupSession(c, ctx, claims, join.MemberId, join.GenerationId, handler) 271} 272 273func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) { 274 req := &JoinGroupRequest{ 275 GroupId: c.groupID, 276 MemberId: c.memberID, 277 SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond), 278 ProtocolType: "consumer", 279 } 280 if c.config.Version.IsAtLeast(V0_10_1_0) { 281 req.Version = 1 282 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond) 283 } 284 285 meta := &ConsumerGroupMemberMetadata{ 286 Topics: topics, 287 UserData: c.config.Consumer.Group.Member.UserData, 288 } 289 strategy := c.config.Consumer.Group.Rebalance.Strategy 290 if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { 291 return nil, err 292 } 293 294 return coordinator.JoinGroup(req) 295} 296 297func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) { 298 req := &SyncGroupRequest{ 299 GroupId: c.groupID, 300 MemberId: c.memberID, 301 GenerationId: generationID, 302 } 303 for memberID, topics := range plan { 304 err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{ 305 Topics: topics, 306 }) 307 if err != nil { 308 return nil, err 309 } 310 } 311 return coordinator.SyncGroup(req) 312} 313 314func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) { 315 req := &HeartbeatRequest{ 316 GroupId: c.groupID, 317 MemberId: memberID, 318 GenerationId: generationID, 319 } 320 321 return coordinator.Heartbeat(req) 322} 323 324func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) { 325 topics := make(map[string][]int32) 326 for _, meta := range members { 327 for _, topic := range meta.Topics { 328 topics[topic] = nil 329 } 330 } 331 332 for topic := range topics { 333 partitions, err := c.client.Partitions(topic) 334 if err != nil { 335 return nil, err 336 } 337 topics[topic] = partitions 338 } 339 340 strategy := c.config.Consumer.Group.Rebalance.Strategy 341 return strategy.Plan(members, topics) 342} 343 344// Leaves the cluster, called by Close, protected by lock. 345func (c *consumerGroup) leave() error { 346 if c.memberID == "" { 347 return nil 348 } 349 350 coordinator, err := c.client.Coordinator(c.groupID) 351 if err != nil { 352 return err 353 } 354 355 resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{ 356 GroupId: c.groupID, 357 MemberId: c.memberID, 358 }) 359 if err != nil { 360 _ = coordinator.Close() 361 return err 362 } 363 364 // Unset memberID 365 c.memberID = "" 366 367 // Check response 368 switch resp.Err { 369 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError: 370 return nil 371 default: 372 return resp.Err 373 } 374} 375 376func (c *consumerGroup) handleError(err error, topic string, partition int32) { 377 select { 378 case <-c.closed: 379 return 380 default: 381 } 382 383 if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 { 384 err = &ConsumerError{ 385 Topic: topic, 386 Partition: partition, 387 Err: err, 388 } 389 } 390 391 if c.config.Consumer.Return.Errors { 392 select { 393 case c.errors <- err: 394 default: 395 } 396 } else { 397 Logger.Println(err) 398 } 399} 400 401// -------------------------------------------------------------------- 402 403// ConsumerGroupSession represents a consumer group member session. 404type ConsumerGroupSession interface { 405 // Claims returns information about the claimed partitions by topic. 406 Claims() map[string][]int32 407 408 // MemberID returns the cluster member ID. 409 MemberID() string 410 411 // GenerationID returns the current generation ID. 412 GenerationID() int32 413 414 // MarkOffset marks the provided offset, alongside a metadata string 415 // that represents the state of the partition consumer at that point in time. The 416 // metadata string can be used by another consumer to restore that state, so it 417 // can resume consumption. 418 // 419 // To follow upstream conventions, you are expected to mark the offset of the 420 // next message to read, not the last message read. Thus, when calling `MarkOffset` 421 // you should typically add one to the offset of the last consumed message. 422 // 423 // Note: calling MarkOffset does not necessarily commit the offset to the backend 424 // store immediately for efficiency reasons, and it may never be committed if 425 // your application crashes. This means that you may end up processing the same 426 // message twice, and your processing should ideally be idempotent. 427 MarkOffset(topic string, partition int32, offset int64, metadata string) 428 429 // ResetOffset resets to the provided offset, alongside a metadata string that 430 // represents the state of the partition consumer at that point in time. Reset 431 // acts as a counterpart to MarkOffset, the difference being that it allows to 432 // reset an offset to an earlier or smaller value, where MarkOffset only 433 // allows incrementing the offset. cf MarkOffset for more details. 434 ResetOffset(topic string, partition int32, offset int64, metadata string) 435 436 // MarkMessage marks a message as consumed. 437 MarkMessage(msg *ConsumerMessage, metadata string) 438 439 // Context returns the session context. 440 Context() context.Context 441} 442 443type consumerGroupSession struct { 444 parent *consumerGroup 445 memberID string 446 generationID int32 447 handler ConsumerGroupHandler 448 449 claims map[string][]int32 450 offsets *offsetManager 451 ctx context.Context 452 cancel func() 453 454 waitGroup sync.WaitGroup 455 releaseOnce sync.Once 456 hbDying, hbDead chan none 457} 458 459func newConsumerGroupSession(parent *consumerGroup, ctx context.Context, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) { 460 // init offset manager 461 offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client) 462 if err != nil { 463 return nil, err 464 } 465 466 // init context 467 ctx, cancel := context.WithCancel(ctx) 468 469 // init session 470 sess := &consumerGroupSession{ 471 parent: parent, 472 memberID: memberID, 473 generationID: generationID, 474 handler: handler, 475 offsets: offsets, 476 claims: claims, 477 ctx: ctx, 478 cancel: cancel, 479 hbDying: make(chan none), 480 hbDead: make(chan none), 481 } 482 483 // start heartbeat loop 484 go sess.heartbeatLoop() 485 486 // create a POM for each claim 487 for topic, partitions := range claims { 488 for _, partition := range partitions { 489 pom, err := offsets.ManagePartition(topic, partition) 490 if err != nil { 491 _ = sess.release(false) 492 return nil, err 493 } 494 495 // handle POM errors 496 go func(topic string, partition int32) { 497 for err := range pom.Errors() { 498 sess.parent.handleError(err, topic, partition) 499 } 500 }(topic, partition) 501 } 502 } 503 504 // perform setup 505 if err := handler.Setup(sess); err != nil { 506 _ = sess.release(true) 507 return nil, err 508 } 509 510 // start consuming 511 for topic, partitions := range claims { 512 for _, partition := range partitions { 513 sess.waitGroup.Add(1) 514 515 go func(topic string, partition int32) { 516 defer sess.waitGroup.Done() 517 518 // cancel the as session as soon as the first 519 // goroutine exits 520 defer sess.cancel() 521 522 // consume a single topic/partition, blocking 523 sess.consume(topic, partition) 524 }(topic, partition) 525 } 526 } 527 return sess, nil 528} 529 530func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims } 531func (s *consumerGroupSession) MemberID() string { return s.memberID } 532func (s *consumerGroupSession) GenerationID() int32 { return s.generationID } 533 534func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) { 535 if pom := s.offsets.findPOM(topic, partition); pom != nil { 536 pom.MarkOffset(offset, metadata) 537 } 538} 539 540func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) { 541 if pom := s.offsets.findPOM(topic, partition); pom != nil { 542 pom.ResetOffset(offset, metadata) 543 } 544} 545 546func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) { 547 s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata) 548} 549 550func (s *consumerGroupSession) Context() context.Context { 551 return s.ctx 552} 553 554func (s *consumerGroupSession) consume(topic string, partition int32) { 555 // quick exit if rebalance is due 556 select { 557 case <-s.ctx.Done(): 558 return 559 case <-s.parent.closed: 560 return 561 default: 562 } 563 564 // get next offset 565 offset := s.parent.config.Consumer.Offsets.Initial 566 if pom := s.offsets.findPOM(topic, partition); pom != nil { 567 offset, _ = pom.NextOffset() 568 } 569 570 // create new claim 571 claim, err := newConsumerGroupClaim(s, topic, partition, offset) 572 if err != nil { 573 s.parent.handleError(err, topic, partition) 574 return 575 } 576 577 // handle errors 578 go func() { 579 for err := range claim.Errors() { 580 s.parent.handleError(err, topic, partition) 581 } 582 }() 583 584 // trigger close when session is done 585 go func() { 586 select { 587 case <-s.ctx.Done(): 588 case <-s.parent.closed: 589 } 590 claim.AsyncClose() 591 }() 592 593 // start processing 594 if err := s.handler.ConsumeClaim(s, claim); err != nil { 595 s.parent.handleError(err, topic, partition) 596 } 597 598 // ensure consumer is clased & drained 599 claim.AsyncClose() 600 for _, err := range claim.waitClosed() { 601 s.parent.handleError(err, topic, partition) 602 } 603} 604 605func (s *consumerGroupSession) release(withCleanup bool) (err error) { 606 // signal release, stop heartbeat 607 s.cancel() 608 609 // wait for consumers to exit 610 s.waitGroup.Wait() 611 612 // perform release 613 s.releaseOnce.Do(func() { 614 if withCleanup { 615 if e := s.handler.Cleanup(s); e != nil { 616 s.parent.handleError(err, "", -1) 617 err = e 618 } 619 } 620 621 if e := s.offsets.Close(); e != nil { 622 err = e 623 } 624 625 close(s.hbDying) 626 <-s.hbDead 627 }) 628 629 return 630} 631 632func (s *consumerGroupSession) heartbeatLoop() { 633 defer close(s.hbDead) 634 defer s.cancel() // trigger the end of the session on exit 635 636 pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval) 637 defer pause.Stop() 638 639 retries := s.parent.config.Metadata.Retry.Max 640 for { 641 coordinator, err := s.parent.client.Coordinator(s.parent.groupID) 642 if err != nil { 643 if retries <= 0 { 644 s.parent.handleError(err, "", -1) 645 return 646 } 647 648 select { 649 case <-s.hbDying: 650 return 651 case <-time.After(s.parent.config.Metadata.Retry.Backoff): 652 retries-- 653 } 654 continue 655 } 656 657 resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID) 658 if err != nil { 659 _ = coordinator.Close() 660 retries-- 661 continue 662 } 663 664 switch resp.Err { 665 case ErrNoError: 666 retries = s.parent.config.Metadata.Retry.Max 667 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration: 668 return 669 default: 670 s.parent.handleError(err, "", -1) 671 return 672 } 673 674 select { 675 case <-pause.C: 676 case <-s.hbDying: 677 return 678 } 679 } 680} 681 682// -------------------------------------------------------------------- 683 684// ConsumerGroupHandler instances are used to handle individual topic/partition claims. 685// It also provides hooks for your consumer group session life-cycle and allow you to 686// trigger logic before or after the consume loop(s). 687// 688// PLEASE NOTE that handlers are likely be called from several goroutines concurrently, 689// ensure that all state is safely protected against race conditions. 690type ConsumerGroupHandler interface { 691 // Setup is run at the beginning of a new session, before ConsumeClaim. 692 Setup(ConsumerGroupSession) error 693 694 // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exites 695 // but before the offsets are committed for the very last time. 696 Cleanup(ConsumerGroupSession) error 697 698 // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). 699 // Once the Messages() channel is closed, the Handler must finish its processing 700 // loop and exit. 701 ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error 702} 703 704// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group. 705type ConsumerGroupClaim interface { 706 // Topic returns the consumed topic name. 707 Topic() string 708 709 // Partition returns the consumed partition. 710 Partition() int32 711 712 // InitialOffset returns the initial offset that was used as a starting point for this claim. 713 InitialOffset() int64 714 715 // HighWaterMarkOffset returns the high water mark offset of the partition, 716 // i.e. the offset that will be used for the next message that will be produced. 717 // You can use this to determine how far behind the processing is. 718 HighWaterMarkOffset() int64 719 720 // Messages returns the read channel for the messages that are returned by 721 // the broker. The messages channel will be closed when a new rebalance cycle 722 // is due. You must finish processing and mark offsets within 723 // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually 724 // re-assigned to another group member. 725 Messages() <-chan *ConsumerMessage 726} 727 728type consumerGroupClaim struct { 729 topic string 730 partition int32 731 offset int64 732 PartitionConsumer 733} 734 735func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) { 736 pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset) 737 if err == ErrOffsetOutOfRange { 738 offset = sess.parent.config.Consumer.Offsets.Initial 739 pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset) 740 } 741 if err != nil { 742 return nil, err 743 } 744 745 go func() { 746 for err := range pcm.Errors() { 747 sess.parent.handleError(err, topic, partition) 748 } 749 }() 750 751 return &consumerGroupClaim{ 752 topic: topic, 753 partition: partition, 754 offset: offset, 755 PartitionConsumer: pcm, 756 }, nil 757} 758 759func (c *consumerGroupClaim) Topic() string { return c.topic } 760func (c *consumerGroupClaim) Partition() int32 { return c.partition } 761func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset } 762 763// Drains messages and errors, ensures the claim is fully closed. 764func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) { 765 go func() { 766 for range c.Messages() { 767 } 768 }() 769 770 for err := range c.Errors() { 771 errs = append(errs, err) 772 } 773 return 774} 775