1package sarama 2 3import ( 4 "errors" 5 "fmt" 6 "sync" 7 "sync/atomic" 8 "time" 9) 10 11// ConsumerMessage encapsulates a Kafka message returned by the consumer. 12type ConsumerMessage struct { 13 Key, Value []byte 14 Topic string 15 Partition int32 16 Offset int64 17 Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp 18 BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp 19} 20 21// ConsumerError is what is provided to the user when an error occurs. 22// It wraps an error and includes the topic and partition. 23type ConsumerError struct { 24 Topic string 25 Partition int32 26 Err error 27} 28 29func (ce ConsumerError) Error() string { 30 return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err) 31} 32 33// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. 34// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors 35// when stopping. 36type ConsumerErrors []*ConsumerError 37 38func (ce ConsumerErrors) Error() string { 39 return fmt.Sprintf("kafka: %d errors while consuming", len(ce)) 40} 41 42// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() 43// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of 44// scope. 45// 46// Sarama's Consumer type does not currently support automatic consumer-group rebalancing and offset tracking. 47// For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library 48// builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the 49// https://github.com/bsm/sarama-cluster library builds on Sarama to add this support. 50type Consumer interface { 51 52 // Topics returns the set of available topics as retrieved from the cluster 53 // metadata. This method is the same as Client.Topics(), and is provided for 54 // convenience. 55 Topics() ([]string, error) 56 57 // Partitions returns the sorted list of all partition IDs for the given topic. 58 // This method is the same as Client.Partitions(), and is provided for convenience. 59 Partitions(topic string) ([]int32, error) 60 61 // ConsumePartition creates a PartitionConsumer on the given topic/partition with 62 // the given offset. It will return an error if this Consumer is already consuming 63 // on the given topic/partition. Offset can be a literal offset, or OffsetNewest 64 // or OffsetOldest 65 ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) 66 67 // HighWaterMarks returns the current high water marks for each topic and partition. 68 // Consistency between partitions is not guaranteed since high water marks are updated separately. 69 HighWaterMarks() map[string]map[int32]int64 70 71 // Close shuts down the consumer. It must be called after all child 72 // PartitionConsumers have already been closed. 73 Close() error 74} 75 76type consumer struct { 77 client Client 78 conf *Config 79 ownClient bool 80 81 lock sync.Mutex 82 children map[string]map[int32]*partitionConsumer 83 brokerConsumers map[*Broker]*brokerConsumer 84} 85 86// NewConsumer creates a new consumer using the given broker addresses and configuration. 87func NewConsumer(addrs []string, config *Config) (Consumer, error) { 88 client, err := NewClient(addrs, config) 89 if err != nil { 90 return nil, err 91 } 92 93 c, err := NewConsumerFromClient(client) 94 if err != nil { 95 return nil, err 96 } 97 c.(*consumer).ownClient = true 98 return c, nil 99} 100 101// NewConsumerFromClient creates a new consumer using the given client. It is still 102// necessary to call Close() on the underlying client when shutting down this consumer. 103func NewConsumerFromClient(client Client) (Consumer, error) { 104 // Check that we are not dealing with a closed Client before processing any other arguments 105 if client.Closed() { 106 return nil, ErrClosedClient 107 } 108 109 c := &consumer{ 110 client: client, 111 conf: client.Config(), 112 children: make(map[string]map[int32]*partitionConsumer), 113 brokerConsumers: make(map[*Broker]*brokerConsumer), 114 } 115 116 return c, nil 117} 118 119func (c *consumer) Close() error { 120 if c.ownClient { 121 return c.client.Close() 122 } 123 return nil 124} 125 126func (c *consumer) Topics() ([]string, error) { 127 return c.client.Topics() 128} 129 130func (c *consumer) Partitions(topic string) ([]int32, error) { 131 return c.client.Partitions(topic) 132} 133 134func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) { 135 child := &partitionConsumer{ 136 consumer: c, 137 conf: c.conf, 138 topic: topic, 139 partition: partition, 140 messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize), 141 errors: make(chan *ConsumerError, c.conf.ChannelBufferSize), 142 feeder: make(chan *FetchResponse, 1), 143 trigger: make(chan none, 1), 144 dying: make(chan none), 145 fetchSize: c.conf.Consumer.Fetch.Default, 146 } 147 148 if err := child.chooseStartingOffset(offset); err != nil { 149 return nil, err 150 } 151 152 var leader *Broker 153 var err error 154 if leader, err = c.client.Leader(child.topic, child.partition); err != nil { 155 return nil, err 156 } 157 158 if err := c.addChild(child); err != nil { 159 return nil, err 160 } 161 162 go withRecover(child.dispatcher) 163 go withRecover(child.responseFeeder) 164 165 child.broker = c.refBrokerConsumer(leader) 166 child.broker.input <- child 167 168 return child, nil 169} 170 171func (c *consumer) HighWaterMarks() map[string]map[int32]int64 { 172 c.lock.Lock() 173 defer c.lock.Unlock() 174 175 hwms := make(map[string]map[int32]int64) 176 for topic, p := range c.children { 177 hwm := make(map[int32]int64, len(p)) 178 for partition, pc := range p { 179 hwm[partition] = pc.HighWaterMarkOffset() 180 } 181 hwms[topic] = hwm 182 } 183 184 return hwms 185} 186 187func (c *consumer) addChild(child *partitionConsumer) error { 188 c.lock.Lock() 189 defer c.lock.Unlock() 190 191 topicChildren := c.children[child.topic] 192 if topicChildren == nil { 193 topicChildren = make(map[int32]*partitionConsumer) 194 c.children[child.topic] = topicChildren 195 } 196 197 if topicChildren[child.partition] != nil { 198 return ConfigurationError("That topic/partition is already being consumed") 199 } 200 201 topicChildren[child.partition] = child 202 return nil 203} 204 205func (c *consumer) removeChild(child *partitionConsumer) { 206 c.lock.Lock() 207 defer c.lock.Unlock() 208 209 delete(c.children[child.topic], child.partition) 210} 211 212func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer { 213 c.lock.Lock() 214 defer c.lock.Unlock() 215 216 bc := c.brokerConsumers[broker] 217 if bc == nil { 218 bc = c.newBrokerConsumer(broker) 219 c.brokerConsumers[broker] = bc 220 } 221 222 bc.refs++ 223 224 return bc 225} 226 227func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) { 228 c.lock.Lock() 229 defer c.lock.Unlock() 230 231 brokerWorker.refs-- 232 233 if brokerWorker.refs == 0 { 234 close(brokerWorker.input) 235 if c.brokerConsumers[brokerWorker.broker] == brokerWorker { 236 delete(c.brokerConsumers, brokerWorker.broker) 237 } 238 } 239} 240 241func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) { 242 c.lock.Lock() 243 defer c.lock.Unlock() 244 245 delete(c.brokerConsumers, brokerWorker.broker) 246} 247 248// PartitionConsumer 249 250// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or 251// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out 252// of scope. 253// 254// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range 255// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported 256// as out of range by the brokers. In this case you should decide what you want to do (try a different offset, 257// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. 258// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set 259// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement 260// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches. 261// 262// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of 263// consumer tear-down & return imediately. Continue to loop, servicing the Messages channel until the teardown process 264// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call 265// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will 266// also drain the Messages channel, harvest all errors & return them once cleanup has completed. 267type PartitionConsumer interface { 268 269 // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you 270 // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this 271 // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call 272 // this before calling Close on the underlying client. 273 AsyncClose() 274 275 // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain 276 // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service 277 // the Messages channel when this function is called, you will be competing with Close for messages; consider 278 // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes 279 // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client. 280 Close() error 281 282 // Messages returns the read channel for the messages that are returned by 283 // the broker. 284 Messages() <-chan *ConsumerMessage 285 286 // Errors returns a read channel of errors that occurred during consuming, if 287 // enabled. By default, errors are logged and not returned over this channel. 288 // If you want to implement any custom error handling, set your config's 289 // Consumer.Return.Errors setting to true, and read from this channel. 290 Errors() <-chan *ConsumerError 291 292 // HighWaterMarkOffset returns the high water mark offset of the partition, 293 // i.e. the offset that will be used for the next message that will be produced. 294 // You can use this to determine how far behind the processing is. 295 HighWaterMarkOffset() int64 296} 297 298type partitionConsumer struct { 299 highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG 300 consumer *consumer 301 conf *Config 302 topic string 303 partition int32 304 305 broker *brokerConsumer 306 messages chan *ConsumerMessage 307 errors chan *ConsumerError 308 feeder chan *FetchResponse 309 310 trigger, dying chan none 311 responseResult error 312 313 fetchSize int32 314 offset int64 315} 316 317var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing 318 319func (child *partitionConsumer) sendError(err error) { 320 cErr := &ConsumerError{ 321 Topic: child.topic, 322 Partition: child.partition, 323 Err: err, 324 } 325 326 if child.conf.Consumer.Return.Errors { 327 child.errors <- cErr 328 } else { 329 Logger.Println(cErr) 330 } 331} 332 333func (child *partitionConsumer) dispatcher() { 334 for range child.trigger { 335 select { 336 case <-child.dying: 337 close(child.trigger) 338 case <-time.After(child.conf.Consumer.Retry.Backoff): 339 if child.broker != nil { 340 child.consumer.unrefBrokerConsumer(child.broker) 341 child.broker = nil 342 } 343 344 Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition) 345 if err := child.dispatch(); err != nil { 346 child.sendError(err) 347 child.trigger <- none{} 348 } 349 } 350 } 351 352 if child.broker != nil { 353 child.consumer.unrefBrokerConsumer(child.broker) 354 } 355 child.consumer.removeChild(child) 356 close(child.feeder) 357} 358 359func (child *partitionConsumer) dispatch() error { 360 if err := child.consumer.client.RefreshMetadata(child.topic); err != nil { 361 return err 362 } 363 364 var leader *Broker 365 var err error 366 if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil { 367 return err 368 } 369 370 child.broker = child.consumer.refBrokerConsumer(leader) 371 372 child.broker.input <- child 373 374 return nil 375} 376 377func (child *partitionConsumer) chooseStartingOffset(offset int64) error { 378 newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest) 379 if err != nil { 380 return err 381 } 382 oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest) 383 if err != nil { 384 return err 385 } 386 387 switch { 388 case offset == OffsetNewest: 389 child.offset = newestOffset 390 case offset == OffsetOldest: 391 child.offset = oldestOffset 392 case offset >= oldestOffset && offset <= newestOffset: 393 child.offset = offset 394 default: 395 return ErrOffsetOutOfRange 396 } 397 398 return nil 399} 400 401func (child *partitionConsumer) Messages() <-chan *ConsumerMessage { 402 return child.messages 403} 404 405func (child *partitionConsumer) Errors() <-chan *ConsumerError { 406 return child.errors 407} 408 409func (child *partitionConsumer) AsyncClose() { 410 // this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes 411 // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and 412 // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will 413 // also just close itself) 414 close(child.dying) 415} 416 417func (child *partitionConsumer) Close() error { 418 child.AsyncClose() 419 420 go withRecover(func() { 421 for range child.messages { 422 // drain 423 } 424 }) 425 426 var errors ConsumerErrors 427 for err := range child.errors { 428 errors = append(errors, err) 429 } 430 431 if len(errors) > 0 { 432 return errors 433 } 434 return nil 435} 436 437func (child *partitionConsumer) HighWaterMarkOffset() int64 { 438 return atomic.LoadInt64(&child.highWaterMarkOffset) 439} 440 441func (child *partitionConsumer) responseFeeder() { 442 var msgs []*ConsumerMessage 443 msgSent := false 444 445feederLoop: 446 for response := range child.feeder { 447 msgs, child.responseResult = child.parseResponse(response) 448 expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime) 449 450 for i, msg := range msgs { 451 messageSelect: 452 select { 453 case child.messages <- msg: 454 msgSent = true 455 case <-expiryTicker.C: 456 if !msgSent { 457 child.responseResult = errTimedOut 458 child.broker.acks.Done() 459 for _, msg = range msgs[i:] { 460 child.messages <- msg 461 } 462 child.broker.input <- child 463 continue feederLoop 464 } else { 465 // current message has not been sent, return to select 466 // statement 467 msgSent = false 468 goto messageSelect 469 } 470 } 471 } 472 473 expiryTicker.Stop() 474 child.broker.acks.Done() 475 } 476 477 close(child.messages) 478 close(child.errors) 479} 480 481func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) { 482 block := response.GetBlock(child.topic, child.partition) 483 if block == nil { 484 return nil, ErrIncompleteResponse 485 } 486 487 if block.Err != ErrNoError { 488 return nil, block.Err 489 } 490 491 if len(block.MsgSet.Messages) == 0 { 492 // We got no messages. If we got a trailing one then we need to ask for more data. 493 // Otherwise we just poll again and wait for one to be produced... 494 if block.MsgSet.PartialTrailingMessage { 495 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max { 496 // we can't ask for more data, we've hit the configured limit 497 child.sendError(ErrMessageTooLarge) 498 child.offset++ // skip this one so we can keep processing future messages 499 } else { 500 child.fetchSize *= 2 501 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max { 502 child.fetchSize = child.conf.Consumer.Fetch.Max 503 } 504 } 505 } 506 507 return nil, nil 508 } 509 510 // we got messages, reset our fetch size in case it was increased for a previous request 511 child.fetchSize = child.conf.Consumer.Fetch.Default 512 atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) 513 514 incomplete := false 515 prelude := true 516 var messages []*ConsumerMessage 517 for _, msgBlock := range block.MsgSet.Messages { 518 519 for _, msg := range msgBlock.Messages() { 520 offset := msg.Offset 521 if msg.Msg.Version >= 1 { 522 baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset 523 offset += baseOffset 524 } 525 if prelude && offset < child.offset { 526 continue 527 } 528 prelude = false 529 530 if offset >= child.offset { 531 messages = append(messages, &ConsumerMessage{ 532 Topic: child.topic, 533 Partition: child.partition, 534 Key: msg.Msg.Key, 535 Value: msg.Msg.Value, 536 Offset: offset, 537 Timestamp: msg.Msg.Timestamp, 538 BlockTimestamp: msgBlock.Msg.Timestamp, 539 }) 540 child.offset = offset + 1 541 } else { 542 incomplete = true 543 } 544 } 545 546 } 547 548 if incomplete || len(messages) == 0 { 549 return nil, ErrIncompleteResponse 550 } 551 return messages, nil 552} 553 554// brokerConsumer 555 556type brokerConsumer struct { 557 consumer *consumer 558 broker *Broker 559 input chan *partitionConsumer 560 newSubscriptions chan []*partitionConsumer 561 wait chan none 562 subscriptions map[*partitionConsumer]none 563 acks sync.WaitGroup 564 refs int 565} 566 567func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { 568 bc := &brokerConsumer{ 569 consumer: c, 570 broker: broker, 571 input: make(chan *partitionConsumer), 572 newSubscriptions: make(chan []*partitionConsumer), 573 wait: make(chan none), 574 subscriptions: make(map[*partitionConsumer]none), 575 refs: 0, 576 } 577 578 go withRecover(bc.subscriptionManager) 579 go withRecover(bc.subscriptionConsumer) 580 581 return bc 582} 583 584func (bc *brokerConsumer) subscriptionManager() { 585 var buffer []*partitionConsumer 586 587 // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer 588 // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks 589 // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give 590 // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available, 591 // so the main goroutine can block waiting for work if it has none. 592 for { 593 if len(buffer) > 0 { 594 select { 595 case event, ok := <-bc.input: 596 if !ok { 597 goto done 598 } 599 buffer = append(buffer, event) 600 case bc.newSubscriptions <- buffer: 601 buffer = nil 602 case bc.wait <- none{}: 603 } 604 } else { 605 select { 606 case event, ok := <-bc.input: 607 if !ok { 608 goto done 609 } 610 buffer = append(buffer, event) 611 case bc.newSubscriptions <- nil: 612 } 613 } 614 } 615 616done: 617 close(bc.wait) 618 if len(buffer) > 0 { 619 bc.newSubscriptions <- buffer 620 } 621 close(bc.newSubscriptions) 622} 623 624func (bc *brokerConsumer) subscriptionConsumer() { 625 <-bc.wait // wait for our first piece of work 626 627 // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available 628 for newSubscriptions := range bc.newSubscriptions { 629 bc.updateSubscriptions(newSubscriptions) 630 631 if len(bc.subscriptions) == 0 { 632 // We're about to be shut down or we're about to receive more subscriptions. 633 // Either way, the signal just hasn't propagated to our goroutine yet. 634 <-bc.wait 635 continue 636 } 637 638 response, err := bc.fetchNewMessages() 639 640 if err != nil { 641 Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err) 642 bc.abort(err) 643 return 644 } 645 646 bc.acks.Add(len(bc.subscriptions)) 647 for child := range bc.subscriptions { 648 child.feeder <- response 649 } 650 bc.acks.Wait() 651 bc.handleResponses() 652 } 653} 654 655func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) { 656 for _, child := range newSubscriptions { 657 bc.subscriptions[child] = none{} 658 Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) 659 } 660 661 for child := range bc.subscriptions { 662 select { 663 case <-child.dying: 664 Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) 665 close(child.trigger) 666 delete(bc.subscriptions, child) 667 default: 668 break 669 } 670 } 671} 672 673func (bc *brokerConsumer) handleResponses() { 674 // handles the response codes left for us by our subscriptions, and abandons ones that have been closed 675 for child := range bc.subscriptions { 676 result := child.responseResult 677 child.responseResult = nil 678 679 switch result { 680 case nil: 681 break 682 case errTimedOut: 683 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", 684 bc.broker.ID(), child.topic, child.partition) 685 delete(bc.subscriptions, child) 686 case ErrOffsetOutOfRange: 687 // there's no point in retrying this it will just fail the same way again 688 // shut it down and force the user to choose what to do 689 child.sendError(result) 690 Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result) 691 close(child.trigger) 692 delete(bc.subscriptions, child) 693 case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable: 694 // not an error, but does need redispatching 695 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", 696 bc.broker.ID(), child.topic, child.partition, result) 697 child.trigger <- none{} 698 delete(bc.subscriptions, child) 699 default: 700 // dunno, tell the user and try redispatching 701 child.sendError(result) 702 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", 703 bc.broker.ID(), child.topic, child.partition, result) 704 child.trigger <- none{} 705 delete(bc.subscriptions, child) 706 } 707 } 708} 709 710func (bc *brokerConsumer) abort(err error) { 711 bc.consumer.abandonBrokerConsumer(bc) 712 _ = bc.broker.Close() // we don't care about the error this might return, we already have one 713 714 for child := range bc.subscriptions { 715 child.sendError(err) 716 child.trigger <- none{} 717 } 718 719 for newSubscriptions := range bc.newSubscriptions { 720 if len(newSubscriptions) == 0 { 721 <-bc.wait 722 continue 723 } 724 for _, child := range newSubscriptions { 725 child.sendError(err) 726 child.trigger <- none{} 727 } 728 } 729} 730 731func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { 732 request := &FetchRequest{ 733 MinBytes: bc.consumer.conf.Consumer.Fetch.Min, 734 MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond), 735 } 736 if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) { 737 request.Version = 2 738 } 739 if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) { 740 request.Version = 3 741 request.MaxBytes = MaxResponseSize 742 } 743 744 for child := range bc.subscriptions { 745 request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) 746 } 747 748 return bc.broker.Fetch(request) 749} 750