1package sarama
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"sort"
8	"sync"
9	"time"
10)
11
12// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
13var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
14
15// ConsumerGroup is responsible for dividing up processing of topics and partitions
16// over a collection of processes (the members of the consumer group).
17type ConsumerGroup interface {
18	// Consume joins a cluster of consumers for a given list of topics and
19	// starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
20	//
21	// The life-cycle of a session is represented by the following steps:
22	//
23	// 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
24	//    and is assigned their "fair share" of partitions, aka 'claims'.
25	// 2. Before processing starts, the handler's Setup() hook is called to notify the user
26	//    of the claims and allow any necessary preparation or alteration of state.
27	// 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
28	//    in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
29	//    from concurrent reads/writes.
30	// 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
31	//    parent context is cancelled or when a server-side rebalance cycle is initiated.
32	// 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
33	//    to allow the user to perform any final tasks before a rebalance.
34	// 6. Finally, marked offsets are committed one last time before claims are released.
35	//
36	// Please note, that once a relance is triggered, sessions must be completed within
37	// Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
38	// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
39	// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
40	// commit failures.
41	Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
42
43	// Errors returns a read channel of errors that occurred during the consumer life-cycle.
44	// By default, errors are logged and not returned over this channel.
45	// If you want to implement any custom error handling, set your config's
46	// Consumer.Return.Errors setting to true, and read from this channel.
47	Errors() <-chan error
48
49	// Close stops the ConsumerGroup and detaches any running sessions. It is required to call
50	// this function before the object passes out of scope, as it will otherwise leak memory.
51	Close() error
52}
53
54type consumerGroup struct {
55	client    Client
56	ownClient bool
57
58	config   *Config
59	consumer Consumer
60	groupID  string
61	memberID string
62	errors   chan error
63
64	lock      sync.Mutex
65	closed    chan none
66	closeOnce sync.Once
67}
68
69// NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
70func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
71	client, err := NewClient(addrs, config)
72	if err != nil {
73		return nil, err
74	}
75
76	c, err := NewConsumerGroupFromClient(groupID, client)
77	if err != nil {
78		_ = client.Close()
79		return nil, err
80	}
81
82	c.(*consumerGroup).ownClient = true
83	return c, nil
84}
85
86// NewConsumerFromClient creates a new consumer group using the given client. It is still
87// necessary to call Close() on the underlying client when shutting down this consumer.
88// PLEASE NOTE: consumer groups can only re-use but not share clients.
89func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
90	config := client.Config()
91	if !config.Version.IsAtLeast(V0_10_2_0) {
92		return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
93	}
94
95	consumer, err := NewConsumerFromClient(client)
96	if err != nil {
97		return nil, err
98	}
99
100	return &consumerGroup{
101		client:   client,
102		consumer: consumer,
103		config:   config,
104		groupID:  groupID,
105		errors:   make(chan error, config.ChannelBufferSize),
106		closed:   make(chan none),
107	}, nil
108}
109
110// Errors implements ConsumerGroup.
111func (c *consumerGroup) Errors() <-chan error { return c.errors }
112
113// Close implements ConsumerGroup.
114func (c *consumerGroup) Close() (err error) {
115	c.closeOnce.Do(func() {
116		close(c.closed)
117
118		c.lock.Lock()
119		defer c.lock.Unlock()
120
121		// leave group
122		if e := c.leave(); e != nil {
123			err = e
124		}
125
126		// drain errors
127		go func() {
128			close(c.errors)
129		}()
130		for e := range c.errors {
131			err = e
132		}
133
134		if c.ownClient {
135			if e := c.client.Close(); e != nil {
136				err = e
137			}
138		}
139	})
140	return
141}
142
143// Consume implements ConsumerGroup.
144func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
145	// Ensure group is not closed
146	select {
147	case <-c.closed:
148		return ErrClosedConsumerGroup
149	default:
150	}
151
152	c.lock.Lock()
153	defer c.lock.Unlock()
154
155	// Quick exit when no topics are provided
156	if len(topics) == 0 {
157		return fmt.Errorf("no topics provided")
158	}
159
160	// Refresh metadata for requested topics
161	if err := c.client.RefreshMetadata(topics...); err != nil {
162		return err
163	}
164
165	// Get coordinator
166	coordinator, err := c.client.Coordinator(c.groupID)
167	if err != nil {
168		return err
169	}
170
171	// Init session
172	sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
173	if err == ErrClosedClient {
174		return ErrClosedConsumerGroup
175	} else if err != nil {
176		return err
177	}
178
179	// Wait for session exit signal
180	<-sess.ctx.Done()
181
182	// Gracefully release session claims
183	return sess.release(true)
184}
185
186func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
187	// Join consumer group
188	join, err := c.joinGroupRequest(coordinator, topics)
189	if err != nil {
190		_ = coordinator.Close()
191		return nil, err
192	}
193	switch join.Err {
194	case ErrNoError:
195		c.memberID = join.MemberId
196	case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
197		c.memberID = ""
198		return c.newSession(ctx, coordinator, topics, handler, retries)
199	case ErrRebalanceInProgress: // retry after backoff
200		if retries <= 0 {
201			return nil, join.Err
202		}
203
204		select {
205		case <-c.closed:
206			return nil, ErrClosedConsumerGroup
207		case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
208		}
209
210		return c.newSession(ctx, coordinator, topics, handler, retries-1)
211	default:
212		return nil, join.Err
213	}
214
215	// Prepare distribution plan if we joined as the leader
216	var plan BalanceStrategyPlan
217	if join.LeaderId == join.MemberId {
218		members, err := join.GetMembers()
219		if err != nil {
220			return nil, err
221		}
222
223		plan, err = c.balance(members)
224		if err != nil {
225			return nil, err
226		}
227	}
228
229	// Sync consumer group
230	sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
231	if err != nil {
232		_ = coordinator.Close()
233		return nil, err
234	}
235	switch sync.Err {
236	case ErrNoError:
237	case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
238		c.memberID = ""
239		return c.newSession(ctx, coordinator, topics, handler, retries)
240	case ErrRebalanceInProgress: // retry after backoff
241		if retries <= 0 {
242			return nil, sync.Err
243		}
244
245		select {
246		case <-c.closed:
247			return nil, ErrClosedConsumerGroup
248		case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
249		}
250
251		return c.newSession(ctx, coordinator, topics, handler, retries-1)
252	default:
253		return nil, sync.Err
254	}
255
256	// Retrieve and sort claims
257	var claims map[string][]int32
258	if len(sync.MemberAssignment) > 0 {
259		members, err := sync.GetMemberAssignment()
260		if err != nil {
261			return nil, err
262		}
263		claims = members.Topics
264
265		for _, partitions := range claims {
266			sort.Sort(int32Slice(partitions))
267		}
268	}
269
270	return newConsumerGroupSession(c, ctx, claims, join.MemberId, join.GenerationId, handler)
271}
272
273func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
274	req := &JoinGroupRequest{
275		GroupId:        c.groupID,
276		MemberId:       c.memberID,
277		SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
278		ProtocolType:   "consumer",
279	}
280	if c.config.Version.IsAtLeast(V0_10_1_0) {
281		req.Version = 1
282		req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
283	}
284
285	meta := &ConsumerGroupMemberMetadata{
286		Topics:   topics,
287		UserData: c.config.Consumer.Group.Member.UserData,
288	}
289	strategy := c.config.Consumer.Group.Rebalance.Strategy
290	if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
291		return nil, err
292	}
293
294	return coordinator.JoinGroup(req)
295}
296
297func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
298	req := &SyncGroupRequest{
299		GroupId:      c.groupID,
300		MemberId:     c.memberID,
301		GenerationId: generationID,
302	}
303	for memberID, topics := range plan {
304		err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{
305			Topics: topics,
306		})
307		if err != nil {
308			return nil, err
309		}
310	}
311	return coordinator.SyncGroup(req)
312}
313
314func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
315	req := &HeartbeatRequest{
316		GroupId:      c.groupID,
317		MemberId:     memberID,
318		GenerationId: generationID,
319	}
320
321	return coordinator.Heartbeat(req)
322}
323
324func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
325	topics := make(map[string][]int32)
326	for _, meta := range members {
327		for _, topic := range meta.Topics {
328			topics[topic] = nil
329		}
330	}
331
332	for topic := range topics {
333		partitions, err := c.client.Partitions(topic)
334		if err != nil {
335			return nil, err
336		}
337		topics[topic] = partitions
338	}
339
340	strategy := c.config.Consumer.Group.Rebalance.Strategy
341	return strategy.Plan(members, topics)
342}
343
344// Leaves the cluster, called by Close, protected by lock.
345func (c *consumerGroup) leave() error {
346	if c.memberID == "" {
347		return nil
348	}
349
350	coordinator, err := c.client.Coordinator(c.groupID)
351	if err != nil {
352		return err
353	}
354
355	resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
356		GroupId:  c.groupID,
357		MemberId: c.memberID,
358	})
359	if err != nil {
360		_ = coordinator.Close()
361		return err
362	}
363
364	// Unset memberID
365	c.memberID = ""
366
367	// Check response
368	switch resp.Err {
369	case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
370		return nil
371	default:
372		return resp.Err
373	}
374}
375
376func (c *consumerGroup) handleError(err error, topic string, partition int32) {
377	select {
378	case <-c.closed:
379		return
380	default:
381	}
382
383	if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
384		err = &ConsumerError{
385			Topic:     topic,
386			Partition: partition,
387			Err:       err,
388		}
389	}
390
391	if c.config.Consumer.Return.Errors {
392		select {
393		case c.errors <- err:
394		default:
395		}
396	} else {
397		Logger.Println(err)
398	}
399}
400
401// --------------------------------------------------------------------
402
403// ConsumerGroupSession represents a consumer group member session.
404type ConsumerGroupSession interface {
405	// Claims returns information about the claimed partitions by topic.
406	Claims() map[string][]int32
407
408	// MemberID returns the cluster member ID.
409	MemberID() string
410
411	// GenerationID returns the current generation ID.
412	GenerationID() int32
413
414	// MarkOffset marks the provided offset, alongside a metadata string
415	// that represents the state of the partition consumer at that point in time. The
416	// metadata string can be used by another consumer to restore that state, so it
417	// can resume consumption.
418	//
419	// To follow upstream conventions, you are expected to mark the offset of the
420	// next message to read, not the last message read. Thus, when calling `MarkOffset`
421	// you should typically add one to the offset of the last consumed message.
422	//
423	// Note: calling MarkOffset does not necessarily commit the offset to the backend
424	// store immediately for efficiency reasons, and it may never be committed if
425	// your application crashes. This means that you may end up processing the same
426	// message twice, and your processing should ideally be idempotent.
427	MarkOffset(topic string, partition int32, offset int64, metadata string)
428
429	// ResetOffset resets to the provided offset, alongside a metadata string that
430	// represents the state of the partition consumer at that point in time. Reset
431	// acts as a counterpart to MarkOffset, the difference being that it allows to
432	// reset an offset to an earlier or smaller value, where MarkOffset only
433	// allows incrementing the offset. cf MarkOffset for more details.
434	ResetOffset(topic string, partition int32, offset int64, metadata string)
435
436	// MarkMessage marks a message as consumed.
437	MarkMessage(msg *ConsumerMessage, metadata string)
438
439	// Context returns the session context.
440	Context() context.Context
441}
442
443type consumerGroupSession struct {
444	parent       *consumerGroup
445	memberID     string
446	generationID int32
447	handler      ConsumerGroupHandler
448
449	claims  map[string][]int32
450	offsets *offsetManager
451	ctx     context.Context
452	cancel  func()
453
454	waitGroup       sync.WaitGroup
455	releaseOnce     sync.Once
456	hbDying, hbDead chan none
457}
458
459func newConsumerGroupSession(parent *consumerGroup, ctx context.Context, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
460	// init offset manager
461	offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
462	if err != nil {
463		return nil, err
464	}
465
466	// init context
467	ctx, cancel := context.WithCancel(ctx)
468
469	// init session
470	sess := &consumerGroupSession{
471		parent:       parent,
472		memberID:     memberID,
473		generationID: generationID,
474		handler:      handler,
475		offsets:      offsets,
476		claims:       claims,
477		ctx:          ctx,
478		cancel:       cancel,
479		hbDying:      make(chan none),
480		hbDead:       make(chan none),
481	}
482
483	// start heartbeat loop
484	go sess.heartbeatLoop()
485
486	// create a POM for each claim
487	for topic, partitions := range claims {
488		for _, partition := range partitions {
489			pom, err := offsets.ManagePartition(topic, partition)
490			if err != nil {
491				_ = sess.release(false)
492				return nil, err
493			}
494
495			// handle POM errors
496			go func(topic string, partition int32) {
497				for err := range pom.Errors() {
498					sess.parent.handleError(err, topic, partition)
499				}
500			}(topic, partition)
501		}
502	}
503
504	// perform setup
505	if err := handler.Setup(sess); err != nil {
506		_ = sess.release(true)
507		return nil, err
508	}
509
510	// start consuming
511	for topic, partitions := range claims {
512		for _, partition := range partitions {
513			sess.waitGroup.Add(1)
514
515			go func(topic string, partition int32) {
516				defer sess.waitGroup.Done()
517
518				// cancel the as session as soon as the first
519				// goroutine exits
520				defer sess.cancel()
521
522				// consume a single topic/partition, blocking
523				sess.consume(topic, partition)
524			}(topic, partition)
525		}
526	}
527	return sess, nil
528}
529
530func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
531func (s *consumerGroupSession) MemberID() string           { return s.memberID }
532func (s *consumerGroupSession) GenerationID() int32        { return s.generationID }
533
534func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
535	if pom := s.offsets.findPOM(topic, partition); pom != nil {
536		pom.MarkOffset(offset, metadata)
537	}
538}
539
540func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
541	if pom := s.offsets.findPOM(topic, partition); pom != nil {
542		pom.ResetOffset(offset, metadata)
543	}
544}
545
546func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
547	s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
548}
549
550func (s *consumerGroupSession) Context() context.Context {
551	return s.ctx
552}
553
554func (s *consumerGroupSession) consume(topic string, partition int32) {
555	// quick exit if rebalance is due
556	select {
557	case <-s.ctx.Done():
558		return
559	case <-s.parent.closed:
560		return
561	default:
562	}
563
564	// get next offset
565	offset := s.parent.config.Consumer.Offsets.Initial
566	if pom := s.offsets.findPOM(topic, partition); pom != nil {
567		offset, _ = pom.NextOffset()
568	}
569
570	// create new claim
571	claim, err := newConsumerGroupClaim(s, topic, partition, offset)
572	if err != nil {
573		s.parent.handleError(err, topic, partition)
574		return
575	}
576
577	// handle errors
578	go func() {
579		for err := range claim.Errors() {
580			s.parent.handleError(err, topic, partition)
581		}
582	}()
583
584	// trigger close when session is done
585	go func() {
586		select {
587		case <-s.ctx.Done():
588		case <-s.parent.closed:
589		}
590		claim.AsyncClose()
591	}()
592
593	// start processing
594	if err := s.handler.ConsumeClaim(s, claim); err != nil {
595		s.parent.handleError(err, topic, partition)
596	}
597
598	// ensure consumer is clased & drained
599	claim.AsyncClose()
600	for _, err := range claim.waitClosed() {
601		s.parent.handleError(err, topic, partition)
602	}
603}
604
605func (s *consumerGroupSession) release(withCleanup bool) (err error) {
606	// signal release, stop heartbeat
607	s.cancel()
608
609	// wait for consumers to exit
610	s.waitGroup.Wait()
611
612	// perform release
613	s.releaseOnce.Do(func() {
614		if withCleanup {
615			if e := s.handler.Cleanup(s); e != nil {
616				s.parent.handleError(err, "", -1)
617				err = e
618			}
619		}
620
621		if e := s.offsets.Close(); e != nil {
622			err = e
623		}
624
625		close(s.hbDying)
626		<-s.hbDead
627	})
628
629	return
630}
631
632func (s *consumerGroupSession) heartbeatLoop() {
633	defer close(s.hbDead)
634	defer s.cancel() // trigger the end of the session on exit
635
636	pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
637	defer pause.Stop()
638
639	retries := s.parent.config.Metadata.Retry.Max
640	for {
641		coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
642		if err != nil {
643			if retries <= 0 {
644				s.parent.handleError(err, "", -1)
645				return
646			}
647
648			select {
649			case <-s.hbDying:
650				return
651			case <-time.After(s.parent.config.Metadata.Retry.Backoff):
652				retries--
653			}
654			continue
655		}
656
657		resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
658		if err != nil {
659			_ = coordinator.Close()
660			retries--
661			continue
662		}
663
664		switch resp.Err {
665		case ErrNoError:
666			retries = s.parent.config.Metadata.Retry.Max
667		case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
668			return
669		default:
670			s.parent.handleError(err, "", -1)
671			return
672		}
673
674		select {
675		case <-pause.C:
676		case <-s.hbDying:
677			return
678		}
679	}
680}
681
682// --------------------------------------------------------------------
683
684// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
685// It also provides hooks for your consumer group session life-cycle and allow you to
686// trigger logic before or after the consume loop(s).
687//
688// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
689// ensure that all state is safely protected against race conditions.
690type ConsumerGroupHandler interface {
691	// Setup is run at the beginning of a new session, before ConsumeClaim.
692	Setup(ConsumerGroupSession) error
693
694	// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exites
695	// but before the offsets are committed for the very last time.
696	Cleanup(ConsumerGroupSession) error
697
698	// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
699	// Once the Messages() channel is closed, the Handler must finish its processing
700	// loop and exit.
701	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
702}
703
704// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
705type ConsumerGroupClaim interface {
706	// Topic returns the consumed topic name.
707	Topic() string
708
709	// Partition returns the consumed partition.
710	Partition() int32
711
712	// InitialOffset returns the initial offset that was used as a starting point for this claim.
713	InitialOffset() int64
714
715	// HighWaterMarkOffset returns the high water mark offset of the partition,
716	// i.e. the offset that will be used for the next message that will be produced.
717	// You can use this to determine how far behind the processing is.
718	HighWaterMarkOffset() int64
719
720	// Messages returns the read channel for the messages that are returned by
721	// the broker. The messages channel will be closed when a new rebalance cycle
722	// is due. You must finish processing and mark offsets within
723	// Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
724	// re-assigned to another group member.
725	Messages() <-chan *ConsumerMessage
726}
727
728type consumerGroupClaim struct {
729	topic     string
730	partition int32
731	offset    int64
732	PartitionConsumer
733}
734
735func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
736	pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
737	if err == ErrOffsetOutOfRange {
738		offset = sess.parent.config.Consumer.Offsets.Initial
739		pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
740	}
741	if err != nil {
742		return nil, err
743	}
744
745	go func() {
746		for err := range pcm.Errors() {
747			sess.parent.handleError(err, topic, partition)
748		}
749	}()
750
751	return &consumerGroupClaim{
752		topic:             topic,
753		partition:         partition,
754		offset:            offset,
755		PartitionConsumer: pcm,
756	}, nil
757}
758
759func (c *consumerGroupClaim) Topic() string        { return c.topic }
760func (c *consumerGroupClaim) Partition() int32     { return c.partition }
761func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
762
763// Drains messages and errors, ensures the claim is fully closed.
764func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
765	go func() {
766		for range c.Messages() {
767		}
768	}()
769
770	for err := range c.Errors() {
771		errs = append(errs, err)
772	}
773	return
774}
775