1package sarama
2
3import (
4	"errors"
5	"fmt"
6	"math"
7	"sync"
8	"sync/atomic"
9	"time"
10
11	"github.com/rcrowley/go-metrics"
12)
13
14// ConsumerMessage encapsulates a Kafka message returned by the consumer.
15type ConsumerMessage struct {
16	Headers        []*RecordHeader // only set if kafka is version 0.11+
17	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
18	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp
19
20	Key, Value []byte
21	Topic      string
22	Partition  int32
23	Offset     int64
24}
25
26// ConsumerError is what is provided to the user when an error occurs.
27// It wraps an error and includes the topic and partition.
28type ConsumerError struct {
29	Topic     string
30	Partition int32
31	Err       error
32}
33
34func (ce ConsumerError) Error() string {
35	return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
36}
37
38// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
39// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
40// when stopping.
41type ConsumerErrors []*ConsumerError
42
43func (ce ConsumerErrors) Error() string {
44	return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
45}
46
47// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
48// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
49// scope.
50type Consumer interface {
51	// Topics returns the set of available topics as retrieved from the cluster
52	// metadata. This method is the same as Client.Topics(), and is provided for
53	// convenience.
54	Topics() ([]string, error)
55
56	// Partitions returns the sorted list of all partition IDs for the given topic.
57	// This method is the same as Client.Partitions(), and is provided for convenience.
58	Partitions(topic string) ([]int32, error)
59
60	// ConsumePartition creates a PartitionConsumer on the given topic/partition with
61	// the given offset. It will return an error if this Consumer is already consuming
62	// on the given topic/partition. Offset can be a literal offset, or OffsetNewest
63	// or OffsetOldest
64	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
65
66	// HighWaterMarks returns the current high water marks for each topic and partition.
67	// Consistency between partitions is not guaranteed since high water marks are updated separately.
68	HighWaterMarks() map[string]map[int32]int64
69
70	// Close shuts down the consumer. It must be called after all child
71	// PartitionConsumers have already been closed.
72	Close() error
73}
74
75type consumer struct {
76	conf            *Config
77	children        map[string]map[int32]*partitionConsumer
78	brokerConsumers map[*Broker]*brokerConsumer
79	client          Client
80	lock            sync.Mutex
81}
82
83// NewConsumer creates a new consumer using the given broker addresses and configuration.
84func NewConsumer(addrs []string, config *Config) (Consumer, error) {
85	client, err := NewClient(addrs, config)
86	if err != nil {
87		return nil, err
88	}
89	return newConsumer(client)
90}
91
92// NewConsumerFromClient creates a new consumer using the given client. It is still
93// necessary to call Close() on the underlying client when shutting down this consumer.
94func NewConsumerFromClient(client Client) (Consumer, error) {
95	// For clients passed in by the client, ensure we don't
96	// call Close() on it.
97	cli := &nopCloserClient{client}
98	return newConsumer(cli)
99}
100
101func newConsumer(client Client) (Consumer, error) {
102	// Check that we are not dealing with a closed Client before processing any other arguments
103	if client.Closed() {
104		return nil, ErrClosedClient
105	}
106
107	c := &consumer{
108		client:          client,
109		conf:            client.Config(),
110		children:        make(map[string]map[int32]*partitionConsumer),
111		brokerConsumers: make(map[*Broker]*brokerConsumer),
112	}
113
114	return c, nil
115}
116
117func (c *consumer) Close() error {
118	return c.client.Close()
119}
120
121func (c *consumer) Topics() ([]string, error) {
122	return c.client.Topics()
123}
124
125func (c *consumer) Partitions(topic string) ([]int32, error) {
126	return c.client.Partitions(topic)
127}
128
129func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
130	child := &partitionConsumer{
131		consumer:  c,
132		conf:      c.conf,
133		topic:     topic,
134		partition: partition,
135		messages:  make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
136		errors:    make(chan *ConsumerError, c.conf.ChannelBufferSize),
137		feeder:    make(chan *FetchResponse, 1),
138		trigger:   make(chan none, 1),
139		dying:     make(chan none),
140		fetchSize: c.conf.Consumer.Fetch.Default,
141	}
142
143	if err := child.chooseStartingOffset(offset); err != nil {
144		return nil, err
145	}
146
147	var leader *Broker
148	var err error
149	if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
150		return nil, err
151	}
152
153	if err := c.addChild(child); err != nil {
154		return nil, err
155	}
156
157	go withRecover(child.dispatcher)
158	go withRecover(child.responseFeeder)
159
160	child.broker = c.refBrokerConsumer(leader)
161	child.broker.input <- child
162
163	return child, nil
164}
165
166func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
167	c.lock.Lock()
168	defer c.lock.Unlock()
169
170	hwms := make(map[string]map[int32]int64)
171	for topic, p := range c.children {
172		hwm := make(map[int32]int64, len(p))
173		for partition, pc := range p {
174			hwm[partition] = pc.HighWaterMarkOffset()
175		}
176		hwms[topic] = hwm
177	}
178
179	return hwms
180}
181
182func (c *consumer) addChild(child *partitionConsumer) error {
183	c.lock.Lock()
184	defer c.lock.Unlock()
185
186	topicChildren := c.children[child.topic]
187	if topicChildren == nil {
188		topicChildren = make(map[int32]*partitionConsumer)
189		c.children[child.topic] = topicChildren
190	}
191
192	if topicChildren[child.partition] != nil {
193		return ConfigurationError("That topic/partition is already being consumed")
194	}
195
196	topicChildren[child.partition] = child
197	return nil
198}
199
200func (c *consumer) removeChild(child *partitionConsumer) {
201	c.lock.Lock()
202	defer c.lock.Unlock()
203
204	delete(c.children[child.topic], child.partition)
205}
206
207func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
208	c.lock.Lock()
209	defer c.lock.Unlock()
210
211	bc := c.brokerConsumers[broker]
212	if bc == nil {
213		bc = c.newBrokerConsumer(broker)
214		c.brokerConsumers[broker] = bc
215	}
216
217	bc.refs++
218
219	return bc
220}
221
222func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
223	c.lock.Lock()
224	defer c.lock.Unlock()
225
226	brokerWorker.refs--
227
228	if brokerWorker.refs == 0 {
229		close(brokerWorker.input)
230		if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
231			delete(c.brokerConsumers, brokerWorker.broker)
232		}
233	}
234}
235
236func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
237	c.lock.Lock()
238	defer c.lock.Unlock()
239
240	delete(c.brokerConsumers, brokerWorker.broker)
241}
242
243// PartitionConsumer
244
245// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
246// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
247// of scope.
248//
249// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
250// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
251// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
252// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
253// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
254// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
255// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
256//
257// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
258// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
259// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
260// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
261// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
262type PartitionConsumer interface {
263	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
264	// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
265	// function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
266	// this before calling Close on the underlying client.
267	AsyncClose()
268
269	// Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
270	// the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
271	// the Messages channel when this function is called, you will be competing with Close for messages; consider
272	// calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
273	// out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
274	Close() error
275
276	// Messages returns the read channel for the messages that are returned by
277	// the broker.
278	Messages() <-chan *ConsumerMessage
279
280	// Errors returns a read channel of errors that occurred during consuming, if
281	// enabled. By default, errors are logged and not returned over this channel.
282	// If you want to implement any custom error handling, set your config's
283	// Consumer.Return.Errors setting to true, and read from this channel.
284	Errors() <-chan *ConsumerError
285
286	// HighWaterMarkOffset returns the high water mark offset of the partition,
287	// i.e. the offset that will be used for the next message that will be produced.
288	// You can use this to determine how far behind the processing is.
289	HighWaterMarkOffset() int64
290}
291
292type partitionConsumer struct {
293	highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
294
295	consumer *consumer
296	conf     *Config
297	broker   *brokerConsumer
298	messages chan *ConsumerMessage
299	errors   chan *ConsumerError
300	feeder   chan *FetchResponse
301
302	trigger, dying chan none
303	closeOnce      sync.Once
304	topic          string
305	partition      int32
306	responseResult error
307	fetchSize      int32
308	offset         int64
309	retries        int32
310}
311
312var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
313
314func (child *partitionConsumer) sendError(err error) {
315	cErr := &ConsumerError{
316		Topic:     child.topic,
317		Partition: child.partition,
318		Err:       err,
319	}
320
321	if child.conf.Consumer.Return.Errors {
322		child.errors <- cErr
323	} else {
324		Logger.Println(cErr)
325	}
326}
327
328func (child *partitionConsumer) computeBackoff() time.Duration {
329	if child.conf.Consumer.Retry.BackoffFunc != nil {
330		retries := atomic.AddInt32(&child.retries, 1)
331		return child.conf.Consumer.Retry.BackoffFunc(int(retries))
332	}
333	return child.conf.Consumer.Retry.Backoff
334}
335
336func (child *partitionConsumer) dispatcher() {
337	for range child.trigger {
338		select {
339		case <-child.dying:
340			close(child.trigger)
341		case <-time.After(child.computeBackoff()):
342			if child.broker != nil {
343				child.consumer.unrefBrokerConsumer(child.broker)
344				child.broker = nil
345			}
346
347			Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
348			if err := child.dispatch(); err != nil {
349				child.sendError(err)
350				child.trigger <- none{}
351			}
352		}
353	}
354
355	if child.broker != nil {
356		child.consumer.unrefBrokerConsumer(child.broker)
357	}
358	child.consumer.removeChild(child)
359	close(child.feeder)
360}
361
362func (child *partitionConsumer) dispatch() error {
363	if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
364		return err
365	}
366
367	var leader *Broker
368	var err error
369	if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
370		return err
371	}
372
373	child.broker = child.consumer.refBrokerConsumer(leader)
374
375	child.broker.input <- child
376
377	return nil
378}
379
380func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
381	newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
382	if err != nil {
383		return err
384	}
385	oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
386	if err != nil {
387		return err
388	}
389
390	switch {
391	case offset == OffsetNewest:
392		child.offset = newestOffset
393	case offset == OffsetOldest:
394		child.offset = oldestOffset
395	case offset >= oldestOffset && offset <= newestOffset:
396		child.offset = offset
397	default:
398		return ErrOffsetOutOfRange
399	}
400
401	return nil
402}
403
404func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
405	return child.messages
406}
407
408func (child *partitionConsumer) Errors() <-chan *ConsumerError {
409	return child.errors
410}
411
412func (child *partitionConsumer) AsyncClose() {
413	// this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
414	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
415	// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
416	// also just close itself)
417	child.closeOnce.Do(func() {
418		close(child.dying)
419	})
420}
421
422func (child *partitionConsumer) Close() error {
423	child.AsyncClose()
424
425	var errors ConsumerErrors
426	for err := range child.errors {
427		errors = append(errors, err)
428	}
429
430	if len(errors) > 0 {
431		return errors
432	}
433	return nil
434}
435
436func (child *partitionConsumer) HighWaterMarkOffset() int64 {
437	return atomic.LoadInt64(&child.highWaterMarkOffset)
438}
439
440func (child *partitionConsumer) responseFeeder() {
441	var msgs []*ConsumerMessage
442	expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
443	firstAttempt := true
444
445feederLoop:
446	for response := range child.feeder {
447		msgs, child.responseResult = child.parseResponse(response)
448
449		if child.responseResult == nil {
450			atomic.StoreInt32(&child.retries, 0)
451		}
452
453		for i, msg := range msgs {
454		messageSelect:
455			select {
456			case <-child.dying:
457				child.broker.acks.Done()
458				continue feederLoop
459			case child.messages <- msg:
460				firstAttempt = true
461			case <-expiryTicker.C:
462				if !firstAttempt {
463					child.responseResult = errTimedOut
464					child.broker.acks.Done()
465				remainingLoop:
466					for _, msg = range msgs[i:] {
467						select {
468						case child.messages <- msg:
469						case <-child.dying:
470							break remainingLoop
471						}
472					}
473					child.broker.input <- child
474					continue feederLoop
475				} else {
476					// current message has not been sent, return to select
477					// statement
478					firstAttempt = false
479					goto messageSelect
480				}
481			}
482		}
483
484		child.broker.acks.Done()
485	}
486
487	expiryTicker.Stop()
488	close(child.messages)
489	close(child.errors)
490}
491
492func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
493	var messages []*ConsumerMessage
494	for _, msgBlock := range msgSet.Messages {
495		for _, msg := range msgBlock.Messages() {
496			offset := msg.Offset
497			timestamp := msg.Msg.Timestamp
498			if msg.Msg.Version >= 1 {
499				baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
500				offset += baseOffset
501				if msg.Msg.LogAppendTime {
502					timestamp = msgBlock.Msg.Timestamp
503				}
504			}
505			if offset < child.offset {
506				continue
507			}
508			messages = append(messages, &ConsumerMessage{
509				Topic:          child.topic,
510				Partition:      child.partition,
511				Key:            msg.Msg.Key,
512				Value:          msg.Msg.Value,
513				Offset:         offset,
514				Timestamp:      timestamp,
515				BlockTimestamp: msgBlock.Msg.Timestamp,
516			})
517			child.offset = offset + 1
518		}
519	}
520	if len(messages) == 0 {
521		child.offset++
522	}
523	return messages, nil
524}
525
526func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
527	messages := make([]*ConsumerMessage, 0, len(batch.Records))
528
529	for _, rec := range batch.Records {
530		offset := batch.FirstOffset + rec.OffsetDelta
531		if offset < child.offset {
532			continue
533		}
534		timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
535		if batch.LogAppendTime {
536			timestamp = batch.MaxTimestamp
537		}
538		messages = append(messages, &ConsumerMessage{
539			Topic:     child.topic,
540			Partition: child.partition,
541			Key:       rec.Key,
542			Value:     rec.Value,
543			Offset:    offset,
544			Timestamp: timestamp,
545			Headers:   rec.Headers,
546		})
547		child.offset = offset + 1
548	}
549	if len(messages) == 0 {
550		child.offset++
551	}
552	return messages, nil
553}
554
555func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
556	var (
557		metricRegistry          = child.conf.MetricRegistry
558		consumerBatchSizeMetric metrics.Histogram
559	)
560
561	if metricRegistry != nil {
562		consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
563	}
564
565	// If request was throttled and empty we log and return without error
566	if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
567		Logger.Printf(
568			"consumer/broker/%d FetchResponse throttled %v\n",
569			child.broker.broker.ID(), response.ThrottleTime)
570		return nil, nil
571	}
572
573	block := response.GetBlock(child.topic, child.partition)
574	if block == nil {
575		return nil, ErrIncompleteResponse
576	}
577
578	if block.Err != ErrNoError {
579		return nil, block.Err
580	}
581
582	nRecs, err := block.numRecords()
583	if err != nil {
584		return nil, err
585	}
586
587	consumerBatchSizeMetric.Update(int64(nRecs))
588
589	if nRecs == 0 {
590		partialTrailingMessage, err := block.isPartial()
591		if err != nil {
592			return nil, err
593		}
594		// We got no messages. If we got a trailing one then we need to ask for more data.
595		// Otherwise we just poll again and wait for one to be produced...
596		if partialTrailingMessage {
597			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
598				// we can't ask for more data, we've hit the configured limit
599				child.sendError(ErrMessageTooLarge)
600				child.offset++ // skip this one so we can keep processing future messages
601			} else {
602				child.fetchSize *= 2
603				// check int32 overflow
604				if child.fetchSize < 0 {
605					child.fetchSize = math.MaxInt32
606				}
607				if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
608					child.fetchSize = child.conf.Consumer.Fetch.Max
609				}
610			}
611		}
612
613		return nil, nil
614	}
615
616	// we got messages, reset our fetch size in case it was increased for a previous request
617	child.fetchSize = child.conf.Consumer.Fetch.Default
618	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
619
620	// abortedProducerIDs contains producerID which message should be ignored as uncommitted
621	// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
622	// - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
623	abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
624	abortedTransactions := block.getAbortedTransactions()
625
626	messages := []*ConsumerMessage{}
627	for _, records := range block.RecordsSet {
628		switch records.recordsType {
629		case legacyRecords:
630			messageSetMessages, err := child.parseMessages(records.MsgSet)
631			if err != nil {
632				return nil, err
633			}
634
635			messages = append(messages, messageSetMessages...)
636		case defaultRecords:
637			// Consume remaining abortedTransaction up to last offset of current batch
638			for _, txn := range abortedTransactions {
639				if txn.FirstOffset > records.RecordBatch.LastOffset() {
640					break
641				}
642				abortedProducerIDs[txn.ProducerID] = struct{}{}
643				// Pop abortedTransactions so that we never add it again
644				abortedTransactions = abortedTransactions[1:]
645			}
646
647			recordBatchMessages, err := child.parseRecords(records.RecordBatch)
648			if err != nil {
649				return nil, err
650			}
651
652			// Parse and commit offset but do not expose messages that are:
653			// - control records
654			// - part of an aborted transaction when set to `ReadCommitted`
655
656			// control record
657			isControl, err := records.isControl()
658			if err != nil {
659				// I don't know why there is this continue in case of error to begin with
660				// Safe bet is to ignore control messages if ReadUncommitted
661				// and block on them in case of error and ReadCommitted
662				if child.conf.Consumer.IsolationLevel == ReadCommitted {
663					return nil, err
664				}
665				continue
666			}
667			if isControl {
668				controlRecord, err := records.getControlRecord()
669				if err != nil {
670					return nil, err
671				}
672
673				if controlRecord.Type == ControlRecordAbort {
674					delete(abortedProducerIDs, records.RecordBatch.ProducerID)
675				}
676				continue
677			}
678
679			// filter aborted transactions
680			if child.conf.Consumer.IsolationLevel == ReadCommitted {
681				_, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
682				if records.RecordBatch.IsTransactional && isAborted {
683					continue
684				}
685			}
686
687			messages = append(messages, recordBatchMessages...)
688		default:
689			return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
690		}
691	}
692
693	return messages, nil
694}
695
696type brokerConsumer struct {
697	consumer         *consumer
698	broker           *Broker
699	input            chan *partitionConsumer
700	newSubscriptions chan []*partitionConsumer
701	subscriptions    map[*partitionConsumer]none
702	wait             chan none
703	acks             sync.WaitGroup
704	refs             int
705}
706
707func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
708	bc := &brokerConsumer{
709		consumer:         c,
710		broker:           broker,
711		input:            make(chan *partitionConsumer),
712		newSubscriptions: make(chan []*partitionConsumer),
713		wait:             make(chan none),
714		subscriptions:    make(map[*partitionConsumer]none),
715		refs:             0,
716	}
717
718	go withRecover(bc.subscriptionManager)
719	go withRecover(bc.subscriptionConsumer)
720
721	return bc
722}
723
724// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
725// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
726// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
727// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
728// so the main goroutine can block waiting for work if it has none.
729func (bc *brokerConsumer) subscriptionManager() {
730	var buffer []*partitionConsumer
731
732	for {
733		if len(buffer) > 0 {
734			select {
735			case event, ok := <-bc.input:
736				if !ok {
737					goto done
738				}
739				buffer = append(buffer, event)
740			case bc.newSubscriptions <- buffer:
741				buffer = nil
742			case bc.wait <- none{}:
743			}
744		} else {
745			select {
746			case event, ok := <-bc.input:
747				if !ok {
748					goto done
749				}
750				buffer = append(buffer, event)
751			case bc.newSubscriptions <- nil:
752			}
753		}
754	}
755
756done:
757	close(bc.wait)
758	if len(buffer) > 0 {
759		bc.newSubscriptions <- buffer
760	}
761	close(bc.newSubscriptions)
762}
763
764//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
765func (bc *brokerConsumer) subscriptionConsumer() {
766	<-bc.wait // wait for our first piece of work
767
768	for newSubscriptions := range bc.newSubscriptions {
769		bc.updateSubscriptions(newSubscriptions)
770
771		if len(bc.subscriptions) == 0 {
772			// We're about to be shut down or we're about to receive more subscriptions.
773			// Either way, the signal just hasn't propagated to our goroutine yet.
774			<-bc.wait
775			continue
776		}
777
778		response, err := bc.fetchNewMessages()
779
780		if err != nil {
781			Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
782			bc.abort(err)
783			return
784		}
785
786		bc.acks.Add(len(bc.subscriptions))
787		for child := range bc.subscriptions {
788			child.feeder <- response
789		}
790		bc.acks.Wait()
791		bc.handleResponses()
792	}
793}
794
795func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
796	for _, child := range newSubscriptions {
797		bc.subscriptions[child] = none{}
798		Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
799	}
800
801	for child := range bc.subscriptions {
802		select {
803		case <-child.dying:
804			Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
805			close(child.trigger)
806			delete(bc.subscriptions, child)
807		default:
808			// no-op
809		}
810	}
811}
812
813//handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
814func (bc *brokerConsumer) handleResponses() {
815	for child := range bc.subscriptions {
816		result := child.responseResult
817		child.responseResult = nil
818
819		switch result {
820		case nil:
821			// no-op
822		case errTimedOut:
823			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
824				bc.broker.ID(), child.topic, child.partition)
825			delete(bc.subscriptions, child)
826		case ErrOffsetOutOfRange:
827			// there's no point in retrying this it will just fail the same way again
828			// shut it down and force the user to choose what to do
829			child.sendError(result)
830			Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
831			close(child.trigger)
832			delete(bc.subscriptions, child)
833		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
834			// not an error, but does need redispatching
835			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
836				bc.broker.ID(), child.topic, child.partition, result)
837			child.trigger <- none{}
838			delete(bc.subscriptions, child)
839		default:
840			// dunno, tell the user and try redispatching
841			child.sendError(result)
842			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
843				bc.broker.ID(), child.topic, child.partition, result)
844			child.trigger <- none{}
845			delete(bc.subscriptions, child)
846		}
847	}
848}
849
850func (bc *brokerConsumer) abort(err error) {
851	bc.consumer.abandonBrokerConsumer(bc)
852	_ = bc.broker.Close() // we don't care about the error this might return, we already have one
853
854	for child := range bc.subscriptions {
855		child.sendError(err)
856		child.trigger <- none{}
857	}
858
859	for newSubscriptions := range bc.newSubscriptions {
860		if len(newSubscriptions) == 0 {
861			<-bc.wait
862			continue
863		}
864		for _, child := range newSubscriptions {
865			child.sendError(err)
866			child.trigger <- none{}
867		}
868	}
869}
870
871func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
872	request := &FetchRequest{
873		MinBytes:    bc.consumer.conf.Consumer.Fetch.Min,
874		MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
875	}
876	if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
877		request.Version = 1
878	}
879	if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
880		request.Version = 2
881	}
882	if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
883		request.Version = 3
884		request.MaxBytes = MaxResponseSize
885	}
886	if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
887		request.Version = 4
888		request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
889	}
890	if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
891		request.Version = 7
892		// We do not currently implement KIP-227 FetchSessions. Setting the id to 0
893		// and the epoch to -1 tells the broker not to generate as session ID we're going
894		// to just ignore anyway.
895		request.SessionID = 0
896		request.SessionEpoch = -1
897	}
898	if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
899		request.Version = 10
900	}
901	if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
902		request.Version = 11
903		request.RackID = bc.consumer.conf.RackID
904	}
905
906	for child := range bc.subscriptions {
907		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
908	}
909
910	return bc.broker.Fetch(request)
911}
912