1package sarama
2
3import (
4	"errors"
5	"fmt"
6	"sync"
7	"sync/atomic"
8	"time"
9)
10
11// ConsumerMessage encapsulates a Kafka message returned by the consumer.
12type ConsumerMessage struct {
13	Key, Value     []byte
14	Topic          string
15	Partition      int32
16	Offset         int64
17	Timestamp      time.Time // only set if kafka is version 0.10+, inner message timestamp
18	BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
19}
20
21// ConsumerError is what is provided to the user when an error occurs.
22// It wraps an error and includes the topic and partition.
23type ConsumerError struct {
24	Topic     string
25	Partition int32
26	Err       error
27}
28
29func (ce ConsumerError) Error() string {
30	return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
31}
32
33// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
34// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
35// when stopping.
36type ConsumerErrors []*ConsumerError
37
38func (ce ConsumerErrors) Error() string {
39	return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
40}
41
42// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
43// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
44// scope.
45//
46// Sarama's Consumer type does not currently support automatic consumer-group rebalancing and offset tracking.
47// For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library
48// builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the
49// https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
50type Consumer interface {
51
52	// Topics returns the set of available topics as retrieved from the cluster
53	// metadata. This method is the same as Client.Topics(), and is provided for
54	// convenience.
55	Topics() ([]string, error)
56
57	// Partitions returns the sorted list of all partition IDs for the given topic.
58	// This method is the same as Client.Partitions(), and is provided for convenience.
59	Partitions(topic string) ([]int32, error)
60
61	// ConsumePartition creates a PartitionConsumer on the given topic/partition with
62	// the given offset. It will return an error if this Consumer is already consuming
63	// on the given topic/partition. Offset can be a literal offset, or OffsetNewest
64	// or OffsetOldest
65	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
66
67	// HighWaterMarks returns the current high water marks for each topic and partition.
68	// Consistency between partitions is not guaranteed since high water marks are updated separately.
69	HighWaterMarks() map[string]map[int32]int64
70
71	// Close shuts down the consumer. It must be called after all child
72	// PartitionConsumers have already been closed.
73	Close() error
74}
75
76type consumer struct {
77	client    Client
78	conf      *Config
79	ownClient bool
80
81	lock            sync.Mutex
82	children        map[string]map[int32]*partitionConsumer
83	brokerConsumers map[*Broker]*brokerConsumer
84}
85
86// NewConsumer creates a new consumer using the given broker addresses and configuration.
87func NewConsumer(addrs []string, config *Config) (Consumer, error) {
88	client, err := NewClient(addrs, config)
89	if err != nil {
90		return nil, err
91	}
92
93	c, err := NewConsumerFromClient(client)
94	if err != nil {
95		return nil, err
96	}
97	c.(*consumer).ownClient = true
98	return c, nil
99}
100
101// NewConsumerFromClient creates a new consumer using the given client. It is still
102// necessary to call Close() on the underlying client when shutting down this consumer.
103func NewConsumerFromClient(client Client) (Consumer, error) {
104	// Check that we are not dealing with a closed Client before processing any other arguments
105	if client.Closed() {
106		return nil, ErrClosedClient
107	}
108
109	c := &consumer{
110		client:          client,
111		conf:            client.Config(),
112		children:        make(map[string]map[int32]*partitionConsumer),
113		brokerConsumers: make(map[*Broker]*brokerConsumer),
114	}
115
116	return c, nil
117}
118
119func (c *consumer) Close() error {
120	if c.ownClient {
121		return c.client.Close()
122	}
123	return nil
124}
125
126func (c *consumer) Topics() ([]string, error) {
127	return c.client.Topics()
128}
129
130func (c *consumer) Partitions(topic string) ([]int32, error) {
131	return c.client.Partitions(topic)
132}
133
134func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
135	child := &partitionConsumer{
136		consumer:  c,
137		conf:      c.conf,
138		topic:     topic,
139		partition: partition,
140		messages:  make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
141		errors:    make(chan *ConsumerError, c.conf.ChannelBufferSize),
142		feeder:    make(chan *FetchResponse, 1),
143		trigger:   make(chan none, 1),
144		dying:     make(chan none),
145		fetchSize: c.conf.Consumer.Fetch.Default,
146	}
147
148	if err := child.chooseStartingOffset(offset); err != nil {
149		return nil, err
150	}
151
152	var leader *Broker
153	var err error
154	if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
155		return nil, err
156	}
157
158	if err := c.addChild(child); err != nil {
159		return nil, err
160	}
161
162	go withRecover(child.dispatcher)
163	go withRecover(child.responseFeeder)
164
165	child.broker = c.refBrokerConsumer(leader)
166	child.broker.input <- child
167
168	return child, nil
169}
170
171func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
172	c.lock.Lock()
173	defer c.lock.Unlock()
174
175	hwms := make(map[string]map[int32]int64)
176	for topic, p := range c.children {
177		hwm := make(map[int32]int64, len(p))
178		for partition, pc := range p {
179			hwm[partition] = pc.HighWaterMarkOffset()
180		}
181		hwms[topic] = hwm
182	}
183
184	return hwms
185}
186
187func (c *consumer) addChild(child *partitionConsumer) error {
188	c.lock.Lock()
189	defer c.lock.Unlock()
190
191	topicChildren := c.children[child.topic]
192	if topicChildren == nil {
193		topicChildren = make(map[int32]*partitionConsumer)
194		c.children[child.topic] = topicChildren
195	}
196
197	if topicChildren[child.partition] != nil {
198		return ConfigurationError("That topic/partition is already being consumed")
199	}
200
201	topicChildren[child.partition] = child
202	return nil
203}
204
205func (c *consumer) removeChild(child *partitionConsumer) {
206	c.lock.Lock()
207	defer c.lock.Unlock()
208
209	delete(c.children[child.topic], child.partition)
210}
211
212func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
213	c.lock.Lock()
214	defer c.lock.Unlock()
215
216	bc := c.brokerConsumers[broker]
217	if bc == nil {
218		bc = c.newBrokerConsumer(broker)
219		c.brokerConsumers[broker] = bc
220	}
221
222	bc.refs++
223
224	return bc
225}
226
227func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
228	c.lock.Lock()
229	defer c.lock.Unlock()
230
231	brokerWorker.refs--
232
233	if brokerWorker.refs == 0 {
234		close(brokerWorker.input)
235		if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
236			delete(c.brokerConsumers, brokerWorker.broker)
237		}
238	}
239}
240
241func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
242	c.lock.Lock()
243	defer c.lock.Unlock()
244
245	delete(c.brokerConsumers, brokerWorker.broker)
246}
247
248// PartitionConsumer
249
250// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
251// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
252// of scope.
253//
254// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
255// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
256// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
257// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
258// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
259// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
260// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
261//
262// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
263// consumer tear-down & return imediately. Continue to loop, servicing the Messages channel until the teardown process
264// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
265// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
266// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
267type PartitionConsumer interface {
268
269	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
270	// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
271	// function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
272	// this before calling Close on the underlying client.
273	AsyncClose()
274
275	// Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
276	// the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
277	// the Messages channel when this function is called, you will be competing with Close for messages; consider
278	// calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
279	// out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
280	Close() error
281
282	// Messages returns the read channel for the messages that are returned by
283	// the broker.
284	Messages() <-chan *ConsumerMessage
285
286	// Errors returns a read channel of errors that occurred during consuming, if
287	// enabled. By default, errors are logged and not returned over this channel.
288	// If you want to implement any custom error handling, set your config's
289	// Consumer.Return.Errors setting to true, and read from this channel.
290	Errors() <-chan *ConsumerError
291
292	// HighWaterMarkOffset returns the high water mark offset of the partition,
293	// i.e. the offset that will be used for the next message that will be produced.
294	// You can use this to determine how far behind the processing is.
295	HighWaterMarkOffset() int64
296}
297
298type partitionConsumer struct {
299	highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
300	consumer            *consumer
301	conf                *Config
302	topic               string
303	partition           int32
304
305	broker   *brokerConsumer
306	messages chan *ConsumerMessage
307	errors   chan *ConsumerError
308	feeder   chan *FetchResponse
309
310	trigger, dying chan none
311	responseResult error
312
313	fetchSize int32
314	offset    int64
315}
316
317var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
318
319func (child *partitionConsumer) sendError(err error) {
320	cErr := &ConsumerError{
321		Topic:     child.topic,
322		Partition: child.partition,
323		Err:       err,
324	}
325
326	if child.conf.Consumer.Return.Errors {
327		child.errors <- cErr
328	} else {
329		Logger.Println(cErr)
330	}
331}
332
333func (child *partitionConsumer) dispatcher() {
334	for range child.trigger {
335		select {
336		case <-child.dying:
337			close(child.trigger)
338		case <-time.After(child.conf.Consumer.Retry.Backoff):
339			if child.broker != nil {
340				child.consumer.unrefBrokerConsumer(child.broker)
341				child.broker = nil
342			}
343
344			Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
345			if err := child.dispatch(); err != nil {
346				child.sendError(err)
347				child.trigger <- none{}
348			}
349		}
350	}
351
352	if child.broker != nil {
353		child.consumer.unrefBrokerConsumer(child.broker)
354	}
355	child.consumer.removeChild(child)
356	close(child.feeder)
357}
358
359func (child *partitionConsumer) dispatch() error {
360	if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
361		return err
362	}
363
364	var leader *Broker
365	var err error
366	if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
367		return err
368	}
369
370	child.broker = child.consumer.refBrokerConsumer(leader)
371
372	child.broker.input <- child
373
374	return nil
375}
376
377func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
378	newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
379	if err != nil {
380		return err
381	}
382	oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
383	if err != nil {
384		return err
385	}
386
387	switch {
388	case offset == OffsetNewest:
389		child.offset = newestOffset
390	case offset == OffsetOldest:
391		child.offset = oldestOffset
392	case offset >= oldestOffset && offset <= newestOffset:
393		child.offset = offset
394	default:
395		return ErrOffsetOutOfRange
396	}
397
398	return nil
399}
400
401func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
402	return child.messages
403}
404
405func (child *partitionConsumer) Errors() <-chan *ConsumerError {
406	return child.errors
407}
408
409func (child *partitionConsumer) AsyncClose() {
410	// this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
411	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
412	// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
413	// also just close itself)
414	close(child.dying)
415}
416
417func (child *partitionConsumer) Close() error {
418	child.AsyncClose()
419
420	go withRecover(func() {
421		for range child.messages {
422			// drain
423		}
424	})
425
426	var errors ConsumerErrors
427	for err := range child.errors {
428		errors = append(errors, err)
429	}
430
431	if len(errors) > 0 {
432		return errors
433	}
434	return nil
435}
436
437func (child *partitionConsumer) HighWaterMarkOffset() int64 {
438	return atomic.LoadInt64(&child.highWaterMarkOffset)
439}
440
441func (child *partitionConsumer) responseFeeder() {
442	var msgs []*ConsumerMessage
443	msgSent := false
444
445feederLoop:
446	for response := range child.feeder {
447		msgs, child.responseResult = child.parseResponse(response)
448		expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
449
450		for i, msg := range msgs {
451		messageSelect:
452			select {
453			case child.messages <- msg:
454				msgSent = true
455			case <-expiryTicker.C:
456				if !msgSent {
457					child.responseResult = errTimedOut
458					child.broker.acks.Done()
459					for _, msg = range msgs[i:] {
460						child.messages <- msg
461					}
462					child.broker.input <- child
463					continue feederLoop
464				} else {
465					// current message has not been sent, return to select
466					// statement
467					msgSent = false
468					goto messageSelect
469				}
470			}
471		}
472
473		expiryTicker.Stop()
474		child.broker.acks.Done()
475	}
476
477	close(child.messages)
478	close(child.errors)
479}
480
481func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
482	block := response.GetBlock(child.topic, child.partition)
483	if block == nil {
484		return nil, ErrIncompleteResponse
485	}
486
487	if block.Err != ErrNoError {
488		return nil, block.Err
489	}
490
491	if len(block.MsgSet.Messages) == 0 {
492		// We got no messages. If we got a trailing one then we need to ask for more data.
493		// Otherwise we just poll again and wait for one to be produced...
494		if block.MsgSet.PartialTrailingMessage {
495			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
496				// we can't ask for more data, we've hit the configured limit
497				child.sendError(ErrMessageTooLarge)
498				child.offset++ // skip this one so we can keep processing future messages
499			} else {
500				child.fetchSize *= 2
501				if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
502					child.fetchSize = child.conf.Consumer.Fetch.Max
503				}
504			}
505		}
506
507		return nil, nil
508	}
509
510	// we got messages, reset our fetch size in case it was increased for a previous request
511	child.fetchSize = child.conf.Consumer.Fetch.Default
512	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
513
514	incomplete := false
515	prelude := true
516	var messages []*ConsumerMessage
517	for _, msgBlock := range block.MsgSet.Messages {
518
519		for _, msg := range msgBlock.Messages() {
520			offset := msg.Offset
521			if msg.Msg.Version >= 1 {
522				baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
523				offset += baseOffset
524			}
525			if prelude && offset < child.offset {
526				continue
527			}
528			prelude = false
529
530			if offset >= child.offset {
531				messages = append(messages, &ConsumerMessage{
532					Topic:          child.topic,
533					Partition:      child.partition,
534					Key:            msg.Msg.Key,
535					Value:          msg.Msg.Value,
536					Offset:         offset,
537					Timestamp:      msg.Msg.Timestamp,
538					BlockTimestamp: msgBlock.Msg.Timestamp,
539				})
540				child.offset = offset + 1
541			} else {
542				incomplete = true
543			}
544		}
545
546	}
547
548	if incomplete || len(messages) == 0 {
549		return nil, ErrIncompleteResponse
550	}
551	return messages, nil
552}
553
554// brokerConsumer
555
556type brokerConsumer struct {
557	consumer         *consumer
558	broker           *Broker
559	input            chan *partitionConsumer
560	newSubscriptions chan []*partitionConsumer
561	wait             chan none
562	subscriptions    map[*partitionConsumer]none
563	acks             sync.WaitGroup
564	refs             int
565}
566
567func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
568	bc := &brokerConsumer{
569		consumer:         c,
570		broker:           broker,
571		input:            make(chan *partitionConsumer),
572		newSubscriptions: make(chan []*partitionConsumer),
573		wait:             make(chan none),
574		subscriptions:    make(map[*partitionConsumer]none),
575		refs:             0,
576	}
577
578	go withRecover(bc.subscriptionManager)
579	go withRecover(bc.subscriptionConsumer)
580
581	return bc
582}
583
584func (bc *brokerConsumer) subscriptionManager() {
585	var buffer []*partitionConsumer
586
587	// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
588	// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
589	// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
590	// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
591	// so the main goroutine can block waiting for work if it has none.
592	for {
593		if len(buffer) > 0 {
594			select {
595			case event, ok := <-bc.input:
596				if !ok {
597					goto done
598				}
599				buffer = append(buffer, event)
600			case bc.newSubscriptions <- buffer:
601				buffer = nil
602			case bc.wait <- none{}:
603			}
604		} else {
605			select {
606			case event, ok := <-bc.input:
607				if !ok {
608					goto done
609				}
610				buffer = append(buffer, event)
611			case bc.newSubscriptions <- nil:
612			}
613		}
614	}
615
616done:
617	close(bc.wait)
618	if len(buffer) > 0 {
619		bc.newSubscriptions <- buffer
620	}
621	close(bc.newSubscriptions)
622}
623
624func (bc *brokerConsumer) subscriptionConsumer() {
625	<-bc.wait // wait for our first piece of work
626
627	// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
628	for newSubscriptions := range bc.newSubscriptions {
629		bc.updateSubscriptions(newSubscriptions)
630
631		if len(bc.subscriptions) == 0 {
632			// We're about to be shut down or we're about to receive more subscriptions.
633			// Either way, the signal just hasn't propagated to our goroutine yet.
634			<-bc.wait
635			continue
636		}
637
638		response, err := bc.fetchNewMessages()
639
640		if err != nil {
641			Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
642			bc.abort(err)
643			return
644		}
645
646		bc.acks.Add(len(bc.subscriptions))
647		for child := range bc.subscriptions {
648			child.feeder <- response
649		}
650		bc.acks.Wait()
651		bc.handleResponses()
652	}
653}
654
655func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
656	for _, child := range newSubscriptions {
657		bc.subscriptions[child] = none{}
658		Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
659	}
660
661	for child := range bc.subscriptions {
662		select {
663		case <-child.dying:
664			Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
665			close(child.trigger)
666			delete(bc.subscriptions, child)
667		default:
668			break
669		}
670	}
671}
672
673func (bc *brokerConsumer) handleResponses() {
674	// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
675	for child := range bc.subscriptions {
676		result := child.responseResult
677		child.responseResult = nil
678
679		switch result {
680		case nil:
681			break
682		case errTimedOut:
683			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
684				bc.broker.ID(), child.topic, child.partition)
685			delete(bc.subscriptions, child)
686		case ErrOffsetOutOfRange:
687			// there's no point in retrying this it will just fail the same way again
688			// shut it down and force the user to choose what to do
689			child.sendError(result)
690			Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
691			close(child.trigger)
692			delete(bc.subscriptions, child)
693		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
694			// not an error, but does need redispatching
695			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
696				bc.broker.ID(), child.topic, child.partition, result)
697			child.trigger <- none{}
698			delete(bc.subscriptions, child)
699		default:
700			// dunno, tell the user and try redispatching
701			child.sendError(result)
702			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
703				bc.broker.ID(), child.topic, child.partition, result)
704			child.trigger <- none{}
705			delete(bc.subscriptions, child)
706		}
707	}
708}
709
710func (bc *brokerConsumer) abort(err error) {
711	bc.consumer.abandonBrokerConsumer(bc)
712	_ = bc.broker.Close() // we don't care about the error this might return, we already have one
713
714	for child := range bc.subscriptions {
715		child.sendError(err)
716		child.trigger <- none{}
717	}
718
719	for newSubscriptions := range bc.newSubscriptions {
720		if len(newSubscriptions) == 0 {
721			<-bc.wait
722			continue
723		}
724		for _, child := range newSubscriptions {
725			child.sendError(err)
726			child.trigger <- none{}
727		}
728	}
729}
730
731func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
732	request := &FetchRequest{
733		MinBytes:    bc.consumer.conf.Consumer.Fetch.Min,
734		MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
735	}
736	if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
737		request.Version = 2
738	}
739	if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
740		request.Version = 3
741		request.MaxBytes = MaxResponseSize
742	}
743
744	for child := range bc.subscriptions {
745		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
746	}
747
748	return bc.broker.Fetch(request)
749}
750