1package sarama 2 3import ( 4 "errors" 5 "fmt" 6 "sync" 7 "sync/atomic" 8 "time" 9) 10 11// ConsumerMessage encapsulates a Kafka message returned by the consumer. 12type ConsumerMessage struct { 13 Key, Value []byte 14 Topic string 15 Partition int32 16 Offset int64 17 Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp 18 BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp 19 Headers []*RecordHeader // only set if kafka is version 0.11+ 20} 21 22// ConsumerError is what is provided to the user when an error occurs. 23// It wraps an error and includes the topic and partition. 24type ConsumerError struct { 25 Topic string 26 Partition int32 27 Err error 28} 29 30func (ce ConsumerError) Error() string { 31 return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err) 32} 33 34// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. 35// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors 36// when stopping. 37type ConsumerErrors []*ConsumerError 38 39func (ce ConsumerErrors) Error() string { 40 return fmt.Sprintf("kafka: %d errors while consuming", len(ce)) 41} 42 43// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() 44// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of 45// scope. 46// 47// Sarama's Consumer type does not currently support automatic consumer-group rebalancing and offset tracking. 48// For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library 49// builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the 50// https://github.com/bsm/sarama-cluster library builds on Sarama to add this support. 51type Consumer interface { 52 53 // Topics returns the set of available topics as retrieved from the cluster 54 // metadata. This method is the same as Client.Topics(), and is provided for 55 // convenience. 56 Topics() ([]string, error) 57 58 // Partitions returns the sorted list of all partition IDs for the given topic. 59 // This method is the same as Client.Partitions(), and is provided for convenience. 60 Partitions(topic string) ([]int32, error) 61 62 // ConsumePartition creates a PartitionConsumer on the given topic/partition with 63 // the given offset. It will return an error if this Consumer is already consuming 64 // on the given topic/partition. Offset can be a literal offset, or OffsetNewest 65 // or OffsetOldest 66 ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) 67 68 // HighWaterMarks returns the current high water marks for each topic and partition. 69 // Consistency between partitions is not guaranteed since high water marks are updated separately. 70 HighWaterMarks() map[string]map[int32]int64 71 72 // Close shuts down the consumer. It must be called after all child 73 // PartitionConsumers have already been closed. 74 Close() error 75} 76 77type consumer struct { 78 client Client 79 conf *Config 80 ownClient bool 81 82 lock sync.Mutex 83 children map[string]map[int32]*partitionConsumer 84 brokerConsumers map[*Broker]*brokerConsumer 85} 86 87// NewConsumer creates a new consumer using the given broker addresses and configuration. 88func NewConsumer(addrs []string, config *Config) (Consumer, error) { 89 client, err := NewClient(addrs, config) 90 if err != nil { 91 return nil, err 92 } 93 94 c, err := NewConsumerFromClient(client) 95 if err != nil { 96 return nil, err 97 } 98 c.(*consumer).ownClient = true 99 return c, nil 100} 101 102// NewConsumerFromClient creates a new consumer using the given client. It is still 103// necessary to call Close() on the underlying client when shutting down this consumer. 104func NewConsumerFromClient(client Client) (Consumer, error) { 105 // Check that we are not dealing with a closed Client before processing any other arguments 106 if client.Closed() { 107 return nil, ErrClosedClient 108 } 109 110 c := &consumer{ 111 client: client, 112 conf: client.Config(), 113 children: make(map[string]map[int32]*partitionConsumer), 114 brokerConsumers: make(map[*Broker]*brokerConsumer), 115 } 116 117 return c, nil 118} 119 120func (c *consumer) Close() error { 121 if c.ownClient { 122 return c.client.Close() 123 } 124 return nil 125} 126 127func (c *consumer) Topics() ([]string, error) { 128 return c.client.Topics() 129} 130 131func (c *consumer) Partitions(topic string) ([]int32, error) { 132 return c.client.Partitions(topic) 133} 134 135func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) { 136 child := &partitionConsumer{ 137 consumer: c, 138 conf: c.conf, 139 topic: topic, 140 partition: partition, 141 messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize), 142 errors: make(chan *ConsumerError, c.conf.ChannelBufferSize), 143 feeder: make(chan *FetchResponse, 1), 144 trigger: make(chan none, 1), 145 dying: make(chan none), 146 fetchSize: c.conf.Consumer.Fetch.Default, 147 } 148 149 if err := child.chooseStartingOffset(offset); err != nil { 150 return nil, err 151 } 152 153 var leader *Broker 154 var err error 155 if leader, err = c.client.Leader(child.topic, child.partition); err != nil { 156 return nil, err 157 } 158 159 if err := c.addChild(child); err != nil { 160 return nil, err 161 } 162 163 go withRecover(child.dispatcher) 164 go withRecover(child.responseFeeder) 165 166 child.broker = c.refBrokerConsumer(leader) 167 child.broker.input <- child 168 169 return child, nil 170} 171 172func (c *consumer) HighWaterMarks() map[string]map[int32]int64 { 173 c.lock.Lock() 174 defer c.lock.Unlock() 175 176 hwms := make(map[string]map[int32]int64) 177 for topic, p := range c.children { 178 hwm := make(map[int32]int64, len(p)) 179 for partition, pc := range p { 180 hwm[partition] = pc.HighWaterMarkOffset() 181 } 182 hwms[topic] = hwm 183 } 184 185 return hwms 186} 187 188func (c *consumer) addChild(child *partitionConsumer) error { 189 c.lock.Lock() 190 defer c.lock.Unlock() 191 192 topicChildren := c.children[child.topic] 193 if topicChildren == nil { 194 topicChildren = make(map[int32]*partitionConsumer) 195 c.children[child.topic] = topicChildren 196 } 197 198 if topicChildren[child.partition] != nil { 199 return ConfigurationError("That topic/partition is already being consumed") 200 } 201 202 topicChildren[child.partition] = child 203 return nil 204} 205 206func (c *consumer) removeChild(child *partitionConsumer) { 207 c.lock.Lock() 208 defer c.lock.Unlock() 209 210 delete(c.children[child.topic], child.partition) 211} 212 213func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer { 214 c.lock.Lock() 215 defer c.lock.Unlock() 216 217 bc := c.brokerConsumers[broker] 218 if bc == nil { 219 bc = c.newBrokerConsumer(broker) 220 c.brokerConsumers[broker] = bc 221 } 222 223 bc.refs++ 224 225 return bc 226} 227 228func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) { 229 c.lock.Lock() 230 defer c.lock.Unlock() 231 232 brokerWorker.refs-- 233 234 if brokerWorker.refs == 0 { 235 close(brokerWorker.input) 236 if c.brokerConsumers[brokerWorker.broker] == brokerWorker { 237 delete(c.brokerConsumers, brokerWorker.broker) 238 } 239 } 240} 241 242func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) { 243 c.lock.Lock() 244 defer c.lock.Unlock() 245 246 delete(c.brokerConsumers, brokerWorker.broker) 247} 248 249// PartitionConsumer 250 251// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or 252// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out 253// of scope. 254// 255// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range 256// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported 257// as out of range by the brokers. In this case you should decide what you want to do (try a different offset, 258// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. 259// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set 260// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement 261// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches. 262// 263// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of 264// consumer tear-down & return imediately. Continue to loop, servicing the Messages channel until the teardown process 265// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call 266// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will 267// also drain the Messages channel, harvest all errors & return them once cleanup has completed. 268type PartitionConsumer interface { 269 270 // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you 271 // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this 272 // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call 273 // this before calling Close on the underlying client. 274 AsyncClose() 275 276 // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain 277 // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service 278 // the Messages channel when this function is called, you will be competing with Close for messages; consider 279 // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes 280 // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client. 281 Close() error 282 283 // Messages returns the read channel for the messages that are returned by 284 // the broker. 285 Messages() <-chan *ConsumerMessage 286 287 // Errors returns a read channel of errors that occurred during consuming, if 288 // enabled. By default, errors are logged and not returned over this channel. 289 // If you want to implement any custom error handling, set your config's 290 // Consumer.Return.Errors setting to true, and read from this channel. 291 Errors() <-chan *ConsumerError 292 293 // HighWaterMarkOffset returns the high water mark offset of the partition, 294 // i.e. the offset that will be used for the next message that will be produced. 295 // You can use this to determine how far behind the processing is. 296 HighWaterMarkOffset() int64 297} 298 299type partitionConsumer struct { 300 highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG 301 consumer *consumer 302 conf *Config 303 topic string 304 partition int32 305 306 broker *brokerConsumer 307 messages chan *ConsumerMessage 308 errors chan *ConsumerError 309 feeder chan *FetchResponse 310 311 trigger, dying chan none 312 responseResult error 313 closeOnce sync.Once 314 315 fetchSize int32 316 offset int64 317} 318 319var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing 320 321func (child *partitionConsumer) sendError(err error) { 322 cErr := &ConsumerError{ 323 Topic: child.topic, 324 Partition: child.partition, 325 Err: err, 326 } 327 328 if child.conf.Consumer.Return.Errors { 329 child.errors <- cErr 330 } else { 331 Logger.Println(cErr) 332 } 333} 334 335func (child *partitionConsumer) dispatcher() { 336 for range child.trigger { 337 select { 338 case <-child.dying: 339 close(child.trigger) 340 case <-time.After(child.conf.Consumer.Retry.Backoff): 341 if child.broker != nil { 342 child.consumer.unrefBrokerConsumer(child.broker) 343 child.broker = nil 344 } 345 346 Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition) 347 if err := child.dispatch(); err != nil { 348 child.sendError(err) 349 child.trigger <- none{} 350 } 351 } 352 } 353 354 if child.broker != nil { 355 child.consumer.unrefBrokerConsumer(child.broker) 356 } 357 child.consumer.removeChild(child) 358 close(child.feeder) 359} 360 361func (child *partitionConsumer) dispatch() error { 362 if err := child.consumer.client.RefreshMetadata(child.topic); err != nil { 363 return err 364 } 365 366 var leader *Broker 367 var err error 368 if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil { 369 return err 370 } 371 372 child.broker = child.consumer.refBrokerConsumer(leader) 373 374 child.broker.input <- child 375 376 return nil 377} 378 379func (child *partitionConsumer) chooseStartingOffset(offset int64) error { 380 newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest) 381 if err != nil { 382 return err 383 } 384 oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest) 385 if err != nil { 386 return err 387 } 388 389 switch { 390 case offset == OffsetNewest: 391 child.offset = newestOffset 392 case offset == OffsetOldest: 393 child.offset = oldestOffset 394 case offset >= oldestOffset && offset <= newestOffset: 395 child.offset = offset 396 default: 397 return ErrOffsetOutOfRange 398 } 399 400 return nil 401} 402 403func (child *partitionConsumer) Messages() <-chan *ConsumerMessage { 404 return child.messages 405} 406 407func (child *partitionConsumer) Errors() <-chan *ConsumerError { 408 return child.errors 409} 410 411func (child *partitionConsumer) AsyncClose() { 412 // this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes 413 // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and 414 // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will 415 // also just close itself) 416 child.closeOnce.Do(func() { 417 close(child.dying) 418 }) 419} 420 421func (child *partitionConsumer) Close() error { 422 child.AsyncClose() 423 424 go withRecover(func() { 425 for range child.messages { 426 // drain 427 } 428 }) 429 430 var errors ConsumerErrors 431 for err := range child.errors { 432 errors = append(errors, err) 433 } 434 435 if len(errors) > 0 { 436 return errors 437 } 438 return nil 439} 440 441func (child *partitionConsumer) HighWaterMarkOffset() int64 { 442 return atomic.LoadInt64(&child.highWaterMarkOffset) 443} 444 445func (child *partitionConsumer) responseFeeder() { 446 var msgs []*ConsumerMessage 447 expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime) 448 firstAttempt := true 449 450feederLoop: 451 for response := range child.feeder { 452 msgs, child.responseResult = child.parseResponse(response) 453 454 for i, msg := range msgs { 455 messageSelect: 456 select { 457 case child.messages <- msg: 458 firstAttempt = true 459 case <-expiryTicker.C: 460 if !firstAttempt { 461 child.responseResult = errTimedOut 462 child.broker.acks.Done() 463 for _, msg = range msgs[i:] { 464 child.messages <- msg 465 } 466 child.broker.input <- child 467 continue feederLoop 468 } else { 469 // current message has not been sent, return to select 470 // statement 471 firstAttempt = false 472 goto messageSelect 473 } 474 } 475 } 476 477 child.broker.acks.Done() 478 } 479 480 expiryTicker.Stop() 481 close(child.messages) 482 close(child.errors) 483} 484 485func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) { 486 var messages []*ConsumerMessage 487 for _, msgBlock := range msgSet.Messages { 488 for _, msg := range msgBlock.Messages() { 489 offset := msg.Offset 490 if msg.Msg.Version >= 1 { 491 baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset 492 offset += baseOffset 493 } 494 if offset < child.offset { 495 continue 496 } 497 messages = append(messages, &ConsumerMessage{ 498 Topic: child.topic, 499 Partition: child.partition, 500 Key: msg.Msg.Key, 501 Value: msg.Msg.Value, 502 Offset: offset, 503 Timestamp: msg.Msg.Timestamp, 504 BlockTimestamp: msgBlock.Msg.Timestamp, 505 }) 506 child.offset = offset + 1 507 } 508 } 509 if len(messages) == 0 { 510 return nil, ErrIncompleteResponse 511 } 512 return messages, nil 513} 514 515func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) { 516 var messages []*ConsumerMessage 517 for _, rec := range batch.Records { 518 offset := batch.FirstOffset + rec.OffsetDelta 519 if offset < child.offset { 520 continue 521 } 522 messages = append(messages, &ConsumerMessage{ 523 Topic: child.topic, 524 Partition: child.partition, 525 Key: rec.Key, 526 Value: rec.Value, 527 Offset: offset, 528 Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta), 529 Headers: rec.Headers, 530 }) 531 child.offset = offset + 1 532 } 533 if len(messages) == 0 { 534 child.offset += 1 535 } 536 return messages, nil 537} 538 539func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) { 540 block := response.GetBlock(child.topic, child.partition) 541 if block == nil { 542 return nil, ErrIncompleteResponse 543 } 544 545 if block.Err != ErrNoError { 546 return nil, block.Err 547 } 548 549 nRecs, err := block.numRecords() 550 if err != nil { 551 return nil, err 552 } 553 if nRecs == 0 { 554 partialTrailingMessage, err := block.isPartial() 555 if err != nil { 556 return nil, err 557 } 558 // We got no messages. If we got a trailing one then we need to ask for more data. 559 // Otherwise we just poll again and wait for one to be produced... 560 if partialTrailingMessage { 561 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max { 562 // we can't ask for more data, we've hit the configured limit 563 child.sendError(ErrMessageTooLarge) 564 child.offset++ // skip this one so we can keep processing future messages 565 } else { 566 child.fetchSize *= 2 567 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max { 568 child.fetchSize = child.conf.Consumer.Fetch.Max 569 } 570 } 571 } 572 573 return nil, nil 574 } 575 576 // we got messages, reset our fetch size in case it was increased for a previous request 577 child.fetchSize = child.conf.Consumer.Fetch.Default 578 atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) 579 580 messages := []*ConsumerMessage{} 581 for _, records := range block.RecordsSet { 582 switch records.recordsType { 583 case legacyRecords: 584 messageSetMessages, err := child.parseMessages(records.MsgSet) 585 if err != nil { 586 return nil, err 587 } 588 589 messages = append(messages, messageSetMessages...) 590 case defaultRecords: 591 recordBatchMessages, err := child.parseRecords(records.RecordBatch) 592 if err != nil { 593 return nil, err 594 } 595 if control, err := records.isControl(); err != nil || control { 596 continue 597 } 598 599 messages = append(messages, recordBatchMessages...) 600 default: 601 return nil, fmt.Errorf("unknown records type: %v", records.recordsType) 602 } 603 } 604 605 return messages, nil 606} 607 608// brokerConsumer 609 610type brokerConsumer struct { 611 consumer *consumer 612 broker *Broker 613 input chan *partitionConsumer 614 newSubscriptions chan []*partitionConsumer 615 wait chan none 616 subscriptions map[*partitionConsumer]none 617 acks sync.WaitGroup 618 refs int 619} 620 621func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { 622 bc := &brokerConsumer{ 623 consumer: c, 624 broker: broker, 625 input: make(chan *partitionConsumer), 626 newSubscriptions: make(chan []*partitionConsumer), 627 wait: make(chan none), 628 subscriptions: make(map[*partitionConsumer]none), 629 refs: 0, 630 } 631 632 go withRecover(bc.subscriptionManager) 633 go withRecover(bc.subscriptionConsumer) 634 635 return bc 636} 637 638func (bc *brokerConsumer) subscriptionManager() { 639 var buffer []*partitionConsumer 640 641 // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer 642 // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks 643 // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give 644 // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available, 645 // so the main goroutine can block waiting for work if it has none. 646 for { 647 if len(buffer) > 0 { 648 select { 649 case event, ok := <-bc.input: 650 if !ok { 651 goto done 652 } 653 buffer = append(buffer, event) 654 case bc.newSubscriptions <- buffer: 655 buffer = nil 656 case bc.wait <- none{}: 657 } 658 } else { 659 select { 660 case event, ok := <-bc.input: 661 if !ok { 662 goto done 663 } 664 buffer = append(buffer, event) 665 case bc.newSubscriptions <- nil: 666 } 667 } 668 } 669 670done: 671 close(bc.wait) 672 if len(buffer) > 0 { 673 bc.newSubscriptions <- buffer 674 } 675 close(bc.newSubscriptions) 676} 677 678func (bc *brokerConsumer) subscriptionConsumer() { 679 <-bc.wait // wait for our first piece of work 680 681 // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available 682 for newSubscriptions := range bc.newSubscriptions { 683 bc.updateSubscriptions(newSubscriptions) 684 685 if len(bc.subscriptions) == 0 { 686 // We're about to be shut down or we're about to receive more subscriptions. 687 // Either way, the signal just hasn't propagated to our goroutine yet. 688 <-bc.wait 689 continue 690 } 691 692 response, err := bc.fetchNewMessages() 693 694 if err != nil { 695 Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err) 696 bc.abort(err) 697 return 698 } 699 700 bc.acks.Add(len(bc.subscriptions)) 701 for child := range bc.subscriptions { 702 child.feeder <- response 703 } 704 bc.acks.Wait() 705 bc.handleResponses() 706 } 707} 708 709func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) { 710 for _, child := range newSubscriptions { 711 bc.subscriptions[child] = none{} 712 Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) 713 } 714 715 for child := range bc.subscriptions { 716 select { 717 case <-child.dying: 718 Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) 719 close(child.trigger) 720 delete(bc.subscriptions, child) 721 default: 722 break 723 } 724 } 725} 726 727func (bc *brokerConsumer) handleResponses() { 728 // handles the response codes left for us by our subscriptions, and abandons ones that have been closed 729 for child := range bc.subscriptions { 730 result := child.responseResult 731 child.responseResult = nil 732 733 switch result { 734 case nil: 735 break 736 case errTimedOut: 737 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", 738 bc.broker.ID(), child.topic, child.partition) 739 delete(bc.subscriptions, child) 740 case ErrOffsetOutOfRange: 741 // there's no point in retrying this it will just fail the same way again 742 // shut it down and force the user to choose what to do 743 child.sendError(result) 744 Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result) 745 close(child.trigger) 746 delete(bc.subscriptions, child) 747 case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable: 748 // not an error, but does need redispatching 749 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", 750 bc.broker.ID(), child.topic, child.partition, result) 751 child.trigger <- none{} 752 delete(bc.subscriptions, child) 753 default: 754 // dunno, tell the user and try redispatching 755 child.sendError(result) 756 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", 757 bc.broker.ID(), child.topic, child.partition, result) 758 child.trigger <- none{} 759 delete(bc.subscriptions, child) 760 } 761 } 762} 763 764func (bc *brokerConsumer) abort(err error) { 765 bc.consumer.abandonBrokerConsumer(bc) 766 _ = bc.broker.Close() // we don't care about the error this might return, we already have one 767 768 for child := range bc.subscriptions { 769 child.sendError(err) 770 child.trigger <- none{} 771 } 772 773 for newSubscriptions := range bc.newSubscriptions { 774 if len(newSubscriptions) == 0 { 775 <-bc.wait 776 continue 777 } 778 for _, child := range newSubscriptions { 779 child.sendError(err) 780 child.trigger <- none{} 781 } 782 } 783} 784 785func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { 786 request := &FetchRequest{ 787 MinBytes: bc.consumer.conf.Consumer.Fetch.Min, 788 MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond), 789 } 790 if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) { 791 request.Version = 2 792 } 793 if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) { 794 request.Version = 3 795 request.MaxBytes = MaxResponseSize 796 } 797 if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) { 798 request.Version = 4 799 request.Isolation = ReadUncommitted // We don't support yet transactions. 800 } 801 802 for child := range bc.subscriptions { 803 request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) 804 } 805 806 return bc.broker.Fetch(request) 807} 808