1package sarama 2 3import ( 4 "errors" 5 "fmt" 6 "math" 7 "sync" 8 "sync/atomic" 9 "time" 10 11 "github.com/rcrowley/go-metrics" 12) 13 14// ConsumerMessage encapsulates a Kafka message returned by the consumer. 15type ConsumerMessage struct { 16 Headers []*RecordHeader // only set if kafka is version 0.11+ 17 Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp 18 BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp 19 20 Key, Value []byte 21 Topic string 22 Partition int32 23 Offset int64 24} 25 26// ConsumerError is what is provided to the user when an error occurs. 27// It wraps an error and includes the topic and partition. 28type ConsumerError struct { 29 Topic string 30 Partition int32 31 Err error 32} 33 34func (ce ConsumerError) Error() string { 35 return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err) 36} 37 38// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. 39// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors 40// when stopping. 41type ConsumerErrors []*ConsumerError 42 43func (ce ConsumerErrors) Error() string { 44 return fmt.Sprintf("kafka: %d errors while consuming", len(ce)) 45} 46 47// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() 48// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of 49// scope. 50type Consumer interface { 51 // Topics returns the set of available topics as retrieved from the cluster 52 // metadata. This method is the same as Client.Topics(), and is provided for 53 // convenience. 54 Topics() ([]string, error) 55 56 // Partitions returns the sorted list of all partition IDs for the given topic. 57 // This method is the same as Client.Partitions(), and is provided for convenience. 58 Partitions(topic string) ([]int32, error) 59 60 // ConsumePartition creates a PartitionConsumer on the given topic/partition with 61 // the given offset. It will return an error if this Consumer is already consuming 62 // on the given topic/partition. Offset can be a literal offset, or OffsetNewest 63 // or OffsetOldest 64 ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) 65 66 // HighWaterMarks returns the current high water marks for each topic and partition. 67 // Consistency between partitions is not guaranteed since high water marks are updated separately. 68 HighWaterMarks() map[string]map[int32]int64 69 70 // Close shuts down the consumer. It must be called after all child 71 // PartitionConsumers have already been closed. 72 Close() error 73} 74 75type consumer struct { 76 conf *Config 77 children map[string]map[int32]*partitionConsumer 78 brokerConsumers map[*Broker]*brokerConsumer 79 client Client 80 lock sync.Mutex 81} 82 83// NewConsumer creates a new consumer using the given broker addresses and configuration. 84func NewConsumer(addrs []string, config *Config) (Consumer, error) { 85 client, err := NewClient(addrs, config) 86 if err != nil { 87 return nil, err 88 } 89 return newConsumer(client) 90} 91 92// NewConsumerFromClient creates a new consumer using the given client. It is still 93// necessary to call Close() on the underlying client when shutting down this consumer. 94func NewConsumerFromClient(client Client) (Consumer, error) { 95 // For clients passed in by the client, ensure we don't 96 // call Close() on it. 97 cli := &nopCloserClient{client} 98 return newConsumer(cli) 99} 100 101func newConsumer(client Client) (Consumer, error) { 102 // Check that we are not dealing with a closed Client before processing any other arguments 103 if client.Closed() { 104 return nil, ErrClosedClient 105 } 106 107 c := &consumer{ 108 client: client, 109 conf: client.Config(), 110 children: make(map[string]map[int32]*partitionConsumer), 111 brokerConsumers: make(map[*Broker]*brokerConsumer), 112 } 113 114 return c, nil 115} 116 117func (c *consumer) Close() error { 118 return c.client.Close() 119} 120 121func (c *consumer) Topics() ([]string, error) { 122 return c.client.Topics() 123} 124 125func (c *consumer) Partitions(topic string) ([]int32, error) { 126 return c.client.Partitions(topic) 127} 128 129func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) { 130 child := &partitionConsumer{ 131 consumer: c, 132 conf: c.conf, 133 topic: topic, 134 partition: partition, 135 messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize), 136 errors: make(chan *ConsumerError, c.conf.ChannelBufferSize), 137 feeder: make(chan *FetchResponse, 1), 138 trigger: make(chan none, 1), 139 dying: make(chan none), 140 fetchSize: c.conf.Consumer.Fetch.Default, 141 } 142 143 if err := child.chooseStartingOffset(offset); err != nil { 144 return nil, err 145 } 146 147 var leader *Broker 148 var err error 149 if leader, err = c.client.Leader(child.topic, child.partition); err != nil { 150 return nil, err 151 } 152 153 if err := c.addChild(child); err != nil { 154 return nil, err 155 } 156 157 go withRecover(child.dispatcher) 158 go withRecover(child.responseFeeder) 159 160 child.broker = c.refBrokerConsumer(leader) 161 child.broker.input <- child 162 163 return child, nil 164} 165 166func (c *consumer) HighWaterMarks() map[string]map[int32]int64 { 167 c.lock.Lock() 168 defer c.lock.Unlock() 169 170 hwms := make(map[string]map[int32]int64) 171 for topic, p := range c.children { 172 hwm := make(map[int32]int64, len(p)) 173 for partition, pc := range p { 174 hwm[partition] = pc.HighWaterMarkOffset() 175 } 176 hwms[topic] = hwm 177 } 178 179 return hwms 180} 181 182func (c *consumer) addChild(child *partitionConsumer) error { 183 c.lock.Lock() 184 defer c.lock.Unlock() 185 186 topicChildren := c.children[child.topic] 187 if topicChildren == nil { 188 topicChildren = make(map[int32]*partitionConsumer) 189 c.children[child.topic] = topicChildren 190 } 191 192 if topicChildren[child.partition] != nil { 193 return ConfigurationError("That topic/partition is already being consumed") 194 } 195 196 topicChildren[child.partition] = child 197 return nil 198} 199 200func (c *consumer) removeChild(child *partitionConsumer) { 201 c.lock.Lock() 202 defer c.lock.Unlock() 203 204 delete(c.children[child.topic], child.partition) 205} 206 207func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer { 208 c.lock.Lock() 209 defer c.lock.Unlock() 210 211 bc := c.brokerConsumers[broker] 212 if bc == nil { 213 bc = c.newBrokerConsumer(broker) 214 c.brokerConsumers[broker] = bc 215 } 216 217 bc.refs++ 218 219 return bc 220} 221 222func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) { 223 c.lock.Lock() 224 defer c.lock.Unlock() 225 226 brokerWorker.refs-- 227 228 if brokerWorker.refs == 0 { 229 close(brokerWorker.input) 230 if c.brokerConsumers[brokerWorker.broker] == brokerWorker { 231 delete(c.brokerConsumers, brokerWorker.broker) 232 } 233 } 234} 235 236func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) { 237 c.lock.Lock() 238 defer c.lock.Unlock() 239 240 delete(c.brokerConsumers, brokerWorker.broker) 241} 242 243// PartitionConsumer 244 245// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or 246// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out 247// of scope. 248// 249// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range 250// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported 251// as out of range by the brokers. In this case you should decide what you want to do (try a different offset, 252// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. 253// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set 254// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement 255// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches. 256// 257// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of 258// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process 259// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call 260// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will 261// also drain the Messages channel, harvest all errors & return them once cleanup has completed. 262type PartitionConsumer interface { 263 // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you 264 // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this 265 // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call 266 // this before calling Close on the underlying client. 267 AsyncClose() 268 269 // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain 270 // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service 271 // the Messages channel when this function is called, you will be competing with Close for messages; consider 272 // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes 273 // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client. 274 Close() error 275 276 // Messages returns the read channel for the messages that are returned by 277 // the broker. 278 Messages() <-chan *ConsumerMessage 279 280 // Errors returns a read channel of errors that occurred during consuming, if 281 // enabled. By default, errors are logged and not returned over this channel. 282 // If you want to implement any custom error handling, set your config's 283 // Consumer.Return.Errors setting to true, and read from this channel. 284 Errors() <-chan *ConsumerError 285 286 // HighWaterMarkOffset returns the high water mark offset of the partition, 287 // i.e. the offset that will be used for the next message that will be produced. 288 // You can use this to determine how far behind the processing is. 289 HighWaterMarkOffset() int64 290} 291 292type partitionConsumer struct { 293 highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG 294 295 consumer *consumer 296 conf *Config 297 broker *brokerConsumer 298 messages chan *ConsumerMessage 299 errors chan *ConsumerError 300 feeder chan *FetchResponse 301 302 trigger, dying chan none 303 closeOnce sync.Once 304 topic string 305 partition int32 306 responseResult error 307 fetchSize int32 308 offset int64 309 retries int32 310} 311 312var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing 313 314func (child *partitionConsumer) sendError(err error) { 315 cErr := &ConsumerError{ 316 Topic: child.topic, 317 Partition: child.partition, 318 Err: err, 319 } 320 321 if child.conf.Consumer.Return.Errors { 322 child.errors <- cErr 323 } else { 324 Logger.Println(cErr) 325 } 326} 327 328func (child *partitionConsumer) computeBackoff() time.Duration { 329 if child.conf.Consumer.Retry.BackoffFunc != nil { 330 retries := atomic.AddInt32(&child.retries, 1) 331 return child.conf.Consumer.Retry.BackoffFunc(int(retries)) 332 } 333 return child.conf.Consumer.Retry.Backoff 334} 335 336func (child *partitionConsumer) dispatcher() { 337 for range child.trigger { 338 select { 339 case <-child.dying: 340 close(child.trigger) 341 case <-time.After(child.computeBackoff()): 342 if child.broker != nil { 343 child.consumer.unrefBrokerConsumer(child.broker) 344 child.broker = nil 345 } 346 347 Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition) 348 if err := child.dispatch(); err != nil { 349 child.sendError(err) 350 child.trigger <- none{} 351 } 352 } 353 } 354 355 if child.broker != nil { 356 child.consumer.unrefBrokerConsumer(child.broker) 357 } 358 child.consumer.removeChild(child) 359 close(child.feeder) 360} 361 362func (child *partitionConsumer) dispatch() error { 363 if err := child.consumer.client.RefreshMetadata(child.topic); err != nil { 364 return err 365 } 366 367 var leader *Broker 368 var err error 369 if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil { 370 return err 371 } 372 373 child.broker = child.consumer.refBrokerConsumer(leader) 374 375 child.broker.input <- child 376 377 return nil 378} 379 380func (child *partitionConsumer) chooseStartingOffset(offset int64) error { 381 newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest) 382 if err != nil { 383 return err 384 } 385 oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest) 386 if err != nil { 387 return err 388 } 389 390 switch { 391 case offset == OffsetNewest: 392 child.offset = newestOffset 393 case offset == OffsetOldest: 394 child.offset = oldestOffset 395 case offset >= oldestOffset && offset <= newestOffset: 396 child.offset = offset 397 default: 398 return ErrOffsetOutOfRange 399 } 400 401 return nil 402} 403 404func (child *partitionConsumer) Messages() <-chan *ConsumerMessage { 405 return child.messages 406} 407 408func (child *partitionConsumer) Errors() <-chan *ConsumerError { 409 return child.errors 410} 411 412func (child *partitionConsumer) AsyncClose() { 413 // this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes 414 // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and 415 // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will 416 // also just close itself) 417 child.closeOnce.Do(func() { 418 close(child.dying) 419 }) 420} 421 422func (child *partitionConsumer) Close() error { 423 child.AsyncClose() 424 425 var errors ConsumerErrors 426 for err := range child.errors { 427 errors = append(errors, err) 428 } 429 430 if len(errors) > 0 { 431 return errors 432 } 433 return nil 434} 435 436func (child *partitionConsumer) HighWaterMarkOffset() int64 { 437 return atomic.LoadInt64(&child.highWaterMarkOffset) 438} 439 440func (child *partitionConsumer) responseFeeder() { 441 var msgs []*ConsumerMessage 442 expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime) 443 firstAttempt := true 444 445feederLoop: 446 for response := range child.feeder { 447 msgs, child.responseResult = child.parseResponse(response) 448 449 if child.responseResult == nil { 450 atomic.StoreInt32(&child.retries, 0) 451 } 452 453 for i, msg := range msgs { 454 messageSelect: 455 select { 456 case <-child.dying: 457 child.broker.acks.Done() 458 continue feederLoop 459 case child.messages <- msg: 460 firstAttempt = true 461 case <-expiryTicker.C: 462 if !firstAttempt { 463 child.responseResult = errTimedOut 464 child.broker.acks.Done() 465 remainingLoop: 466 for _, msg = range msgs[i:] { 467 select { 468 case child.messages <- msg: 469 case <-child.dying: 470 break remainingLoop 471 } 472 } 473 child.broker.input <- child 474 continue feederLoop 475 } else { 476 // current message has not been sent, return to select 477 // statement 478 firstAttempt = false 479 goto messageSelect 480 } 481 } 482 } 483 484 child.broker.acks.Done() 485 } 486 487 expiryTicker.Stop() 488 close(child.messages) 489 close(child.errors) 490} 491 492func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) { 493 var messages []*ConsumerMessage 494 for _, msgBlock := range msgSet.Messages { 495 for _, msg := range msgBlock.Messages() { 496 offset := msg.Offset 497 timestamp := msg.Msg.Timestamp 498 if msg.Msg.Version >= 1 { 499 baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset 500 offset += baseOffset 501 if msg.Msg.LogAppendTime { 502 timestamp = msgBlock.Msg.Timestamp 503 } 504 } 505 if offset < child.offset { 506 continue 507 } 508 messages = append(messages, &ConsumerMessage{ 509 Topic: child.topic, 510 Partition: child.partition, 511 Key: msg.Msg.Key, 512 Value: msg.Msg.Value, 513 Offset: offset, 514 Timestamp: timestamp, 515 BlockTimestamp: msgBlock.Msg.Timestamp, 516 }) 517 child.offset = offset + 1 518 } 519 } 520 if len(messages) == 0 { 521 child.offset++ 522 } 523 return messages, nil 524} 525 526func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) { 527 messages := make([]*ConsumerMessage, 0, len(batch.Records)) 528 529 for _, rec := range batch.Records { 530 offset := batch.FirstOffset + rec.OffsetDelta 531 if offset < child.offset { 532 continue 533 } 534 timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta) 535 if batch.LogAppendTime { 536 timestamp = batch.MaxTimestamp 537 } 538 messages = append(messages, &ConsumerMessage{ 539 Topic: child.topic, 540 Partition: child.partition, 541 Key: rec.Key, 542 Value: rec.Value, 543 Offset: offset, 544 Timestamp: timestamp, 545 Headers: rec.Headers, 546 }) 547 child.offset = offset + 1 548 } 549 if len(messages) == 0 { 550 child.offset++ 551 } 552 return messages, nil 553} 554 555func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) { 556 var ( 557 metricRegistry = child.conf.MetricRegistry 558 consumerBatchSizeMetric metrics.Histogram 559 ) 560 561 if metricRegistry != nil { 562 consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry) 563 } 564 565 // If request was throttled and empty we log and return without error 566 if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 { 567 Logger.Printf( 568 "consumer/broker/%d FetchResponse throttled %v\n", 569 child.broker.broker.ID(), response.ThrottleTime) 570 return nil, nil 571 } 572 573 block := response.GetBlock(child.topic, child.partition) 574 if block == nil { 575 return nil, ErrIncompleteResponse 576 } 577 578 if block.Err != ErrNoError { 579 return nil, block.Err 580 } 581 582 nRecs, err := block.numRecords() 583 if err != nil { 584 return nil, err 585 } 586 587 consumerBatchSizeMetric.Update(int64(nRecs)) 588 589 if nRecs == 0 { 590 partialTrailingMessage, err := block.isPartial() 591 if err != nil { 592 return nil, err 593 } 594 // We got no messages. If we got a trailing one then we need to ask for more data. 595 // Otherwise we just poll again and wait for one to be produced... 596 if partialTrailingMessage { 597 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max { 598 // we can't ask for more data, we've hit the configured limit 599 child.sendError(ErrMessageTooLarge) 600 child.offset++ // skip this one so we can keep processing future messages 601 } else { 602 child.fetchSize *= 2 603 // check int32 overflow 604 if child.fetchSize < 0 { 605 child.fetchSize = math.MaxInt32 606 } 607 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max { 608 child.fetchSize = child.conf.Consumer.Fetch.Max 609 } 610 } 611 } 612 613 return nil, nil 614 } 615 616 // we got messages, reset our fetch size in case it was increased for a previous request 617 child.fetchSize = child.conf.Consumer.Fetch.Default 618 atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) 619 620 // abortedProducerIDs contains producerID which message should be ignored as uncommitted 621 // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset) 622 // - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over 623 abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions)) 624 abortedTransactions := block.getAbortedTransactions() 625 626 messages := []*ConsumerMessage{} 627 for _, records := range block.RecordsSet { 628 switch records.recordsType { 629 case legacyRecords: 630 messageSetMessages, err := child.parseMessages(records.MsgSet) 631 if err != nil { 632 return nil, err 633 } 634 635 messages = append(messages, messageSetMessages...) 636 case defaultRecords: 637 // Consume remaining abortedTransaction up to last offset of current batch 638 for _, txn := range abortedTransactions { 639 if txn.FirstOffset > records.RecordBatch.LastOffset() { 640 break 641 } 642 abortedProducerIDs[txn.ProducerID] = struct{}{} 643 // Pop abortedTransactions so that we never add it again 644 abortedTransactions = abortedTransactions[1:] 645 } 646 647 recordBatchMessages, err := child.parseRecords(records.RecordBatch) 648 if err != nil { 649 return nil, err 650 } 651 652 // Parse and commit offset but do not expose messages that are: 653 // - control records 654 // - part of an aborted transaction when set to `ReadCommitted` 655 656 // control record 657 isControl, err := records.isControl() 658 if err != nil { 659 // I don't know why there is this continue in case of error to begin with 660 // Safe bet is to ignore control messages if ReadUncommitted 661 // and block on them in case of error and ReadCommitted 662 if child.conf.Consumer.IsolationLevel == ReadCommitted { 663 return nil, err 664 } 665 continue 666 } 667 if isControl { 668 controlRecord, err := records.getControlRecord() 669 if err != nil { 670 return nil, err 671 } 672 673 if controlRecord.Type == ControlRecordAbort { 674 delete(abortedProducerIDs, records.RecordBatch.ProducerID) 675 } 676 continue 677 } 678 679 // filter aborted transactions 680 if child.conf.Consumer.IsolationLevel == ReadCommitted { 681 _, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID] 682 if records.RecordBatch.IsTransactional && isAborted { 683 continue 684 } 685 } 686 687 messages = append(messages, recordBatchMessages...) 688 default: 689 return nil, fmt.Errorf("unknown records type: %v", records.recordsType) 690 } 691 } 692 693 return messages, nil 694} 695 696type brokerConsumer struct { 697 consumer *consumer 698 broker *Broker 699 input chan *partitionConsumer 700 newSubscriptions chan []*partitionConsumer 701 subscriptions map[*partitionConsumer]none 702 wait chan none 703 acks sync.WaitGroup 704 refs int 705} 706 707func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { 708 bc := &brokerConsumer{ 709 consumer: c, 710 broker: broker, 711 input: make(chan *partitionConsumer), 712 newSubscriptions: make(chan []*partitionConsumer), 713 wait: make(chan none), 714 subscriptions: make(map[*partitionConsumer]none), 715 refs: 0, 716 } 717 718 go withRecover(bc.subscriptionManager) 719 go withRecover(bc.subscriptionConsumer) 720 721 return bc 722} 723 724// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer 725// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks 726// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give 727// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available, 728// so the main goroutine can block waiting for work if it has none. 729func (bc *brokerConsumer) subscriptionManager() { 730 var buffer []*partitionConsumer 731 732 for { 733 if len(buffer) > 0 { 734 select { 735 case event, ok := <-bc.input: 736 if !ok { 737 goto done 738 } 739 buffer = append(buffer, event) 740 case bc.newSubscriptions <- buffer: 741 buffer = nil 742 case bc.wait <- none{}: 743 } 744 } else { 745 select { 746 case event, ok := <-bc.input: 747 if !ok { 748 goto done 749 } 750 buffer = append(buffer, event) 751 case bc.newSubscriptions <- nil: 752 } 753 } 754 } 755 756done: 757 close(bc.wait) 758 if len(buffer) > 0 { 759 bc.newSubscriptions <- buffer 760 } 761 close(bc.newSubscriptions) 762} 763 764//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available 765func (bc *brokerConsumer) subscriptionConsumer() { 766 <-bc.wait // wait for our first piece of work 767 768 for newSubscriptions := range bc.newSubscriptions { 769 bc.updateSubscriptions(newSubscriptions) 770 771 if len(bc.subscriptions) == 0 { 772 // We're about to be shut down or we're about to receive more subscriptions. 773 // Either way, the signal just hasn't propagated to our goroutine yet. 774 <-bc.wait 775 continue 776 } 777 778 response, err := bc.fetchNewMessages() 779 780 if err != nil { 781 Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err) 782 bc.abort(err) 783 return 784 } 785 786 bc.acks.Add(len(bc.subscriptions)) 787 for child := range bc.subscriptions { 788 child.feeder <- response 789 } 790 bc.acks.Wait() 791 bc.handleResponses() 792 } 793} 794 795func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) { 796 for _, child := range newSubscriptions { 797 bc.subscriptions[child] = none{} 798 Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) 799 } 800 801 for child := range bc.subscriptions { 802 select { 803 case <-child.dying: 804 Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) 805 close(child.trigger) 806 delete(bc.subscriptions, child) 807 default: 808 // no-op 809 } 810 } 811} 812 813//handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed 814func (bc *brokerConsumer) handleResponses() { 815 for child := range bc.subscriptions { 816 result := child.responseResult 817 child.responseResult = nil 818 819 switch result { 820 case nil: 821 // no-op 822 case errTimedOut: 823 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", 824 bc.broker.ID(), child.topic, child.partition) 825 delete(bc.subscriptions, child) 826 case ErrOffsetOutOfRange: 827 // there's no point in retrying this it will just fail the same way again 828 // shut it down and force the user to choose what to do 829 child.sendError(result) 830 Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result) 831 close(child.trigger) 832 delete(bc.subscriptions, child) 833 case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable: 834 // not an error, but does need redispatching 835 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", 836 bc.broker.ID(), child.topic, child.partition, result) 837 child.trigger <- none{} 838 delete(bc.subscriptions, child) 839 default: 840 // dunno, tell the user and try redispatching 841 child.sendError(result) 842 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", 843 bc.broker.ID(), child.topic, child.partition, result) 844 child.trigger <- none{} 845 delete(bc.subscriptions, child) 846 } 847 } 848} 849 850func (bc *brokerConsumer) abort(err error) { 851 bc.consumer.abandonBrokerConsumer(bc) 852 _ = bc.broker.Close() // we don't care about the error this might return, we already have one 853 854 for child := range bc.subscriptions { 855 child.sendError(err) 856 child.trigger <- none{} 857 } 858 859 for newSubscriptions := range bc.newSubscriptions { 860 if len(newSubscriptions) == 0 { 861 <-bc.wait 862 continue 863 } 864 for _, child := range newSubscriptions { 865 child.sendError(err) 866 child.trigger <- none{} 867 } 868 } 869} 870 871func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { 872 request := &FetchRequest{ 873 MinBytes: bc.consumer.conf.Consumer.Fetch.Min, 874 MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond), 875 } 876 if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) { 877 request.Version = 1 878 } 879 if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) { 880 request.Version = 2 881 } 882 if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) { 883 request.Version = 3 884 request.MaxBytes = MaxResponseSize 885 } 886 if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) { 887 request.Version = 4 888 request.Isolation = bc.consumer.conf.Consumer.IsolationLevel 889 } 890 if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) { 891 request.Version = 7 892 // We do not currently implement KIP-227 FetchSessions. Setting the id to 0 893 // and the epoch to -1 tells the broker not to generate as session ID we're going 894 // to just ignore anyway. 895 request.SessionID = 0 896 request.SessionEpoch = -1 897 } 898 if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) { 899 request.Version = 10 900 } 901 if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) { 902 request.Version = 11 903 request.RackID = bc.consumer.conf.RackID 904 } 905 906 for child := range bc.subscriptions { 907 request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) 908 } 909 910 return bc.broker.Fetch(request) 911} 912