1package sarama
2
3import (
4	"errors"
5	"fmt"
6	"sync"
7	"sync/atomic"
8	"time"
9)
10
11// ConsumerMessage encapsulates a Kafka message returned by the consumer.
12type ConsumerMessage struct {
13	Key, Value     []byte
14	Topic          string
15	Partition      int32
16	Offset         int64
17	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
18	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp
19	Headers        []*RecordHeader // only set if kafka is version 0.11+
20}
21
22// ConsumerError is what is provided to the user when an error occurs.
23// It wraps an error and includes the topic and partition.
24type ConsumerError struct {
25	Topic     string
26	Partition int32
27	Err       error
28}
29
30func (ce ConsumerError) Error() string {
31	return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
32}
33
34// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
35// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
36// when stopping.
37type ConsumerErrors []*ConsumerError
38
39func (ce ConsumerErrors) Error() string {
40	return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
41}
42
43// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
44// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
45// scope.
46//
47// Sarama's Consumer type does not currently support automatic consumer-group rebalancing and offset tracking.
48// For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library
49// builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the
50// https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
51type Consumer interface {
52
53	// Topics returns the set of available topics as retrieved from the cluster
54	// metadata. This method is the same as Client.Topics(), and is provided for
55	// convenience.
56	Topics() ([]string, error)
57
58	// Partitions returns the sorted list of all partition IDs for the given topic.
59	// This method is the same as Client.Partitions(), and is provided for convenience.
60	Partitions(topic string) ([]int32, error)
61
62	// ConsumePartition creates a PartitionConsumer on the given topic/partition with
63	// the given offset. It will return an error if this Consumer is already consuming
64	// on the given topic/partition. Offset can be a literal offset, or OffsetNewest
65	// or OffsetOldest
66	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
67
68	// HighWaterMarks returns the current high water marks for each topic and partition.
69	// Consistency between partitions is not guaranteed since high water marks are updated separately.
70	HighWaterMarks() map[string]map[int32]int64
71
72	// Close shuts down the consumer. It must be called after all child
73	// PartitionConsumers have already been closed.
74	Close() error
75}
76
77type consumer struct {
78	client    Client
79	conf      *Config
80	ownClient bool
81
82	lock            sync.Mutex
83	children        map[string]map[int32]*partitionConsumer
84	brokerConsumers map[*Broker]*brokerConsumer
85}
86
87// NewConsumer creates a new consumer using the given broker addresses and configuration.
88func NewConsumer(addrs []string, config *Config) (Consumer, error) {
89	client, err := NewClient(addrs, config)
90	if err != nil {
91		return nil, err
92	}
93
94	c, err := NewConsumerFromClient(client)
95	if err != nil {
96		return nil, err
97	}
98	c.(*consumer).ownClient = true
99	return c, nil
100}
101
102// NewConsumerFromClient creates a new consumer using the given client. It is still
103// necessary to call Close() on the underlying client when shutting down this consumer.
104func NewConsumerFromClient(client Client) (Consumer, error) {
105	// Check that we are not dealing with a closed Client before processing any other arguments
106	if client.Closed() {
107		return nil, ErrClosedClient
108	}
109
110	c := &consumer{
111		client:          client,
112		conf:            client.Config(),
113		children:        make(map[string]map[int32]*partitionConsumer),
114		brokerConsumers: make(map[*Broker]*brokerConsumer),
115	}
116
117	return c, nil
118}
119
120func (c *consumer) Close() error {
121	if c.ownClient {
122		return c.client.Close()
123	}
124	return nil
125}
126
127func (c *consumer) Topics() ([]string, error) {
128	return c.client.Topics()
129}
130
131func (c *consumer) Partitions(topic string) ([]int32, error) {
132	return c.client.Partitions(topic)
133}
134
135func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
136	child := &partitionConsumer{
137		consumer:  c,
138		conf:      c.conf,
139		topic:     topic,
140		partition: partition,
141		messages:  make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
142		errors:    make(chan *ConsumerError, c.conf.ChannelBufferSize),
143		feeder:    make(chan *FetchResponse, 1),
144		trigger:   make(chan none, 1),
145		dying:     make(chan none),
146		fetchSize: c.conf.Consumer.Fetch.Default,
147	}
148
149	if err := child.chooseStartingOffset(offset); err != nil {
150		return nil, err
151	}
152
153	var leader *Broker
154	var err error
155	if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
156		return nil, err
157	}
158
159	if err := c.addChild(child); err != nil {
160		return nil, err
161	}
162
163	go withRecover(child.dispatcher)
164	go withRecover(child.responseFeeder)
165
166	child.broker = c.refBrokerConsumer(leader)
167	child.broker.input <- child
168
169	return child, nil
170}
171
172func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
173	c.lock.Lock()
174	defer c.lock.Unlock()
175
176	hwms := make(map[string]map[int32]int64)
177	for topic, p := range c.children {
178		hwm := make(map[int32]int64, len(p))
179		for partition, pc := range p {
180			hwm[partition] = pc.HighWaterMarkOffset()
181		}
182		hwms[topic] = hwm
183	}
184
185	return hwms
186}
187
188func (c *consumer) addChild(child *partitionConsumer) error {
189	c.lock.Lock()
190	defer c.lock.Unlock()
191
192	topicChildren := c.children[child.topic]
193	if topicChildren == nil {
194		topicChildren = make(map[int32]*partitionConsumer)
195		c.children[child.topic] = topicChildren
196	}
197
198	if topicChildren[child.partition] != nil {
199		return ConfigurationError("That topic/partition is already being consumed")
200	}
201
202	topicChildren[child.partition] = child
203	return nil
204}
205
206func (c *consumer) removeChild(child *partitionConsumer) {
207	c.lock.Lock()
208	defer c.lock.Unlock()
209
210	delete(c.children[child.topic], child.partition)
211}
212
213func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
214	c.lock.Lock()
215	defer c.lock.Unlock()
216
217	bc := c.brokerConsumers[broker]
218	if bc == nil {
219		bc = c.newBrokerConsumer(broker)
220		c.brokerConsumers[broker] = bc
221	}
222
223	bc.refs++
224
225	return bc
226}
227
228func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
229	c.lock.Lock()
230	defer c.lock.Unlock()
231
232	brokerWorker.refs--
233
234	if brokerWorker.refs == 0 {
235		close(brokerWorker.input)
236		if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
237			delete(c.brokerConsumers, brokerWorker.broker)
238		}
239	}
240}
241
242func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
243	c.lock.Lock()
244	defer c.lock.Unlock()
245
246	delete(c.brokerConsumers, brokerWorker.broker)
247}
248
249// PartitionConsumer
250
251// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
252// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
253// of scope.
254//
255// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
256// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
257// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
258// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
259// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
260// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
261// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
262//
263// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
264// consumer tear-down & return imediately. Continue to loop, servicing the Messages channel until the teardown process
265// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
266// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
267// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
268type PartitionConsumer interface {
269
270	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
271	// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
272	// function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
273	// this before calling Close on the underlying client.
274	AsyncClose()
275
276	// Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
277	// the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
278	// the Messages channel when this function is called, you will be competing with Close for messages; consider
279	// calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
280	// out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
281	Close() error
282
283	// Messages returns the read channel for the messages that are returned by
284	// the broker.
285	Messages() <-chan *ConsumerMessage
286
287	// Errors returns a read channel of errors that occurred during consuming, if
288	// enabled. By default, errors are logged and not returned over this channel.
289	// If you want to implement any custom error handling, set your config's
290	// Consumer.Return.Errors setting to true, and read from this channel.
291	Errors() <-chan *ConsumerError
292
293	// HighWaterMarkOffset returns the high water mark offset of the partition,
294	// i.e. the offset that will be used for the next message that will be produced.
295	// You can use this to determine how far behind the processing is.
296	HighWaterMarkOffset() int64
297}
298
299type partitionConsumer struct {
300	highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
301	consumer            *consumer
302	conf                *Config
303	topic               string
304	partition           int32
305
306	broker   *brokerConsumer
307	messages chan *ConsumerMessage
308	errors   chan *ConsumerError
309	feeder   chan *FetchResponse
310
311	trigger, dying chan none
312	responseResult error
313	closeOnce      sync.Once
314
315	fetchSize int32
316	offset    int64
317}
318
319var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
320
321func (child *partitionConsumer) sendError(err error) {
322	cErr := &ConsumerError{
323		Topic:     child.topic,
324		Partition: child.partition,
325		Err:       err,
326	}
327
328	if child.conf.Consumer.Return.Errors {
329		child.errors <- cErr
330	} else {
331		Logger.Println(cErr)
332	}
333}
334
335func (child *partitionConsumer) dispatcher() {
336	for range child.trigger {
337		select {
338		case <-child.dying:
339			close(child.trigger)
340		case <-time.After(child.conf.Consumer.Retry.Backoff):
341			if child.broker != nil {
342				child.consumer.unrefBrokerConsumer(child.broker)
343				child.broker = nil
344			}
345
346			Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
347			if err := child.dispatch(); err != nil {
348				child.sendError(err)
349				child.trigger <- none{}
350			}
351		}
352	}
353
354	if child.broker != nil {
355		child.consumer.unrefBrokerConsumer(child.broker)
356	}
357	child.consumer.removeChild(child)
358	close(child.feeder)
359}
360
361func (child *partitionConsumer) dispatch() error {
362	if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
363		return err
364	}
365
366	var leader *Broker
367	var err error
368	if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
369		return err
370	}
371
372	child.broker = child.consumer.refBrokerConsumer(leader)
373
374	child.broker.input <- child
375
376	return nil
377}
378
379func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
380	newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
381	if err != nil {
382		return err
383	}
384	oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
385	if err != nil {
386		return err
387	}
388
389	switch {
390	case offset == OffsetNewest:
391		child.offset = newestOffset
392	case offset == OffsetOldest:
393		child.offset = oldestOffset
394	case offset >= oldestOffset && offset <= newestOffset:
395		child.offset = offset
396	default:
397		return ErrOffsetOutOfRange
398	}
399
400	return nil
401}
402
403func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
404	return child.messages
405}
406
407func (child *partitionConsumer) Errors() <-chan *ConsumerError {
408	return child.errors
409}
410
411func (child *partitionConsumer) AsyncClose() {
412	// this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
413	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
414	// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
415	// also just close itself)
416	child.closeOnce.Do(func() {
417		close(child.dying)
418	})
419}
420
421func (child *partitionConsumer) Close() error {
422	child.AsyncClose()
423
424	go withRecover(func() {
425		for range child.messages {
426			// drain
427		}
428	})
429
430	var errors ConsumerErrors
431	for err := range child.errors {
432		errors = append(errors, err)
433	}
434
435	if len(errors) > 0 {
436		return errors
437	}
438	return nil
439}
440
441func (child *partitionConsumer) HighWaterMarkOffset() int64 {
442	return atomic.LoadInt64(&child.highWaterMarkOffset)
443}
444
445func (child *partitionConsumer) responseFeeder() {
446	var msgs []*ConsumerMessage
447	expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
448	firstAttempt := true
449
450feederLoop:
451	for response := range child.feeder {
452		msgs, child.responseResult = child.parseResponse(response)
453
454		for i, msg := range msgs {
455		messageSelect:
456			select {
457			case child.messages <- msg:
458				firstAttempt = true
459			case <-expiryTicker.C:
460				if !firstAttempt {
461					child.responseResult = errTimedOut
462					child.broker.acks.Done()
463					for _, msg = range msgs[i:] {
464						child.messages <- msg
465					}
466					child.broker.input <- child
467					continue feederLoop
468				} else {
469					// current message has not been sent, return to select
470					// statement
471					firstAttempt = false
472					goto messageSelect
473				}
474			}
475		}
476
477		child.broker.acks.Done()
478	}
479
480	expiryTicker.Stop()
481	close(child.messages)
482	close(child.errors)
483}
484
485func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
486	var messages []*ConsumerMessage
487	for _, msgBlock := range msgSet.Messages {
488		for _, msg := range msgBlock.Messages() {
489			offset := msg.Offset
490			if msg.Msg.Version >= 1 {
491				baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
492				offset += baseOffset
493			}
494			if offset < child.offset {
495				continue
496			}
497			messages = append(messages, &ConsumerMessage{
498				Topic:          child.topic,
499				Partition:      child.partition,
500				Key:            msg.Msg.Key,
501				Value:          msg.Msg.Value,
502				Offset:         offset,
503				Timestamp:      msg.Msg.Timestamp,
504				BlockTimestamp: msgBlock.Msg.Timestamp,
505			})
506			child.offset = offset + 1
507		}
508	}
509	if len(messages) == 0 {
510		return nil, ErrIncompleteResponse
511	}
512	return messages, nil
513}
514
515func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
516	var messages []*ConsumerMessage
517	for _, rec := range batch.Records {
518		offset := batch.FirstOffset + rec.OffsetDelta
519		if offset < child.offset {
520			continue
521		}
522		messages = append(messages, &ConsumerMessage{
523			Topic:     child.topic,
524			Partition: child.partition,
525			Key:       rec.Key,
526			Value:     rec.Value,
527			Offset:    offset,
528			Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
529			Headers:   rec.Headers,
530		})
531		child.offset = offset + 1
532	}
533	if len(messages) == 0 {
534		child.offset += 1
535	}
536	return messages, nil
537}
538
539func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
540	block := response.GetBlock(child.topic, child.partition)
541	if block == nil {
542		return nil, ErrIncompleteResponse
543	}
544
545	if block.Err != ErrNoError {
546		return nil, block.Err
547	}
548
549	nRecs, err := block.numRecords()
550	if err != nil {
551		return nil, err
552	}
553	if nRecs == 0 {
554		partialTrailingMessage, err := block.isPartial()
555		if err != nil {
556			return nil, err
557		}
558		// We got no messages. If we got a trailing one then we need to ask for more data.
559		// Otherwise we just poll again and wait for one to be produced...
560		if partialTrailingMessage {
561			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
562				// we can't ask for more data, we've hit the configured limit
563				child.sendError(ErrMessageTooLarge)
564				child.offset++ // skip this one so we can keep processing future messages
565			} else {
566				child.fetchSize *= 2
567				if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
568					child.fetchSize = child.conf.Consumer.Fetch.Max
569				}
570			}
571		}
572
573		return nil, nil
574	}
575
576	// we got messages, reset our fetch size in case it was increased for a previous request
577	child.fetchSize = child.conf.Consumer.Fetch.Default
578	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
579
580	messages := []*ConsumerMessage{}
581	for _, records := range block.RecordsSet {
582		switch records.recordsType {
583		case legacyRecords:
584			messageSetMessages, err := child.parseMessages(records.MsgSet)
585			if err != nil {
586				return nil, err
587			}
588
589			messages = append(messages, messageSetMessages...)
590		case defaultRecords:
591			recordBatchMessages, err := child.parseRecords(records.RecordBatch)
592			if err != nil {
593				return nil, err
594			}
595			if control, err := records.isControl(); err != nil || control {
596				continue
597			}
598
599			messages = append(messages, recordBatchMessages...)
600		default:
601			return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
602		}
603	}
604
605	return messages, nil
606}
607
608// brokerConsumer
609
610type brokerConsumer struct {
611	consumer         *consumer
612	broker           *Broker
613	input            chan *partitionConsumer
614	newSubscriptions chan []*partitionConsumer
615	wait             chan none
616	subscriptions    map[*partitionConsumer]none
617	acks             sync.WaitGroup
618	refs             int
619}
620
621func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
622	bc := &brokerConsumer{
623		consumer:         c,
624		broker:           broker,
625		input:            make(chan *partitionConsumer),
626		newSubscriptions: make(chan []*partitionConsumer),
627		wait:             make(chan none),
628		subscriptions:    make(map[*partitionConsumer]none),
629		refs:             0,
630	}
631
632	go withRecover(bc.subscriptionManager)
633	go withRecover(bc.subscriptionConsumer)
634
635	return bc
636}
637
638func (bc *brokerConsumer) subscriptionManager() {
639	var buffer []*partitionConsumer
640
641	// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
642	// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
643	// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
644	// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
645	// so the main goroutine can block waiting for work if it has none.
646	for {
647		if len(buffer) > 0 {
648			select {
649			case event, ok := <-bc.input:
650				if !ok {
651					goto done
652				}
653				buffer = append(buffer, event)
654			case bc.newSubscriptions <- buffer:
655				buffer = nil
656			case bc.wait <- none{}:
657			}
658		} else {
659			select {
660			case event, ok := <-bc.input:
661				if !ok {
662					goto done
663				}
664				buffer = append(buffer, event)
665			case bc.newSubscriptions <- nil:
666			}
667		}
668	}
669
670done:
671	close(bc.wait)
672	if len(buffer) > 0 {
673		bc.newSubscriptions <- buffer
674	}
675	close(bc.newSubscriptions)
676}
677
678func (bc *brokerConsumer) subscriptionConsumer() {
679	<-bc.wait // wait for our first piece of work
680
681	// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
682	for newSubscriptions := range bc.newSubscriptions {
683		bc.updateSubscriptions(newSubscriptions)
684
685		if len(bc.subscriptions) == 0 {
686			// We're about to be shut down or we're about to receive more subscriptions.
687			// Either way, the signal just hasn't propagated to our goroutine yet.
688			<-bc.wait
689			continue
690		}
691
692		response, err := bc.fetchNewMessages()
693
694		if err != nil {
695			Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
696			bc.abort(err)
697			return
698		}
699
700		bc.acks.Add(len(bc.subscriptions))
701		for child := range bc.subscriptions {
702			child.feeder <- response
703		}
704		bc.acks.Wait()
705		bc.handleResponses()
706	}
707}
708
709func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
710	for _, child := range newSubscriptions {
711		bc.subscriptions[child] = none{}
712		Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
713	}
714
715	for child := range bc.subscriptions {
716		select {
717		case <-child.dying:
718			Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
719			close(child.trigger)
720			delete(bc.subscriptions, child)
721		default:
722			break
723		}
724	}
725}
726
727func (bc *brokerConsumer) handleResponses() {
728	// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
729	for child := range bc.subscriptions {
730		result := child.responseResult
731		child.responseResult = nil
732
733		switch result {
734		case nil:
735			break
736		case errTimedOut:
737			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
738				bc.broker.ID(), child.topic, child.partition)
739			delete(bc.subscriptions, child)
740		case ErrOffsetOutOfRange:
741			// there's no point in retrying this it will just fail the same way again
742			// shut it down and force the user to choose what to do
743			child.sendError(result)
744			Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
745			close(child.trigger)
746			delete(bc.subscriptions, child)
747		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
748			// not an error, but does need redispatching
749			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
750				bc.broker.ID(), child.topic, child.partition, result)
751			child.trigger <- none{}
752			delete(bc.subscriptions, child)
753		default:
754			// dunno, tell the user and try redispatching
755			child.sendError(result)
756			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
757				bc.broker.ID(), child.topic, child.partition, result)
758			child.trigger <- none{}
759			delete(bc.subscriptions, child)
760		}
761	}
762}
763
764func (bc *brokerConsumer) abort(err error) {
765	bc.consumer.abandonBrokerConsumer(bc)
766	_ = bc.broker.Close() // we don't care about the error this might return, we already have one
767
768	for child := range bc.subscriptions {
769		child.sendError(err)
770		child.trigger <- none{}
771	}
772
773	for newSubscriptions := range bc.newSubscriptions {
774		if len(newSubscriptions) == 0 {
775			<-bc.wait
776			continue
777		}
778		for _, child := range newSubscriptions {
779			child.sendError(err)
780			child.trigger <- none{}
781		}
782	}
783}
784
785func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
786	request := &FetchRequest{
787		MinBytes:    bc.consumer.conf.Consumer.Fetch.Min,
788		MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
789	}
790	if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
791		request.Version = 2
792	}
793	if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
794		request.Version = 3
795		request.MaxBytes = MaxResponseSize
796	}
797	if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
798		request.Version = 4
799		request.Isolation = ReadUncommitted // We don't support yet transactions.
800	}
801
802	for child := range bc.subscriptions {
803		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
804	}
805
806	return bc.broker.Fetch(request)
807}
808