1package sarama 2 3import ( 4 "math/rand" 5 "sort" 6 "sync" 7 "time" 8) 9 10// Client is a generic Kafka client. It manages connections to one or more Kafka brokers. 11// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected 12// automatically when it passes out of scope. It is safe to share a client amongst many 13// users, however Kafka will process requests from a single client strictly in serial, 14// so it is generally more efficient to use the default one client per producer/consumer. 15type Client interface { 16 // Config returns the Config struct of the client. This struct should not be 17 // altered after it has been created. 18 Config() *Config 19 20 // Controller returns the cluster controller broker. It will return a 21 // locally cached value if it's available. You can call RefreshController 22 // to update the cached value. Requires Kafka 0.10 or higher. 23 Controller() (*Broker, error) 24 25 // RefreshController retrieves the cluster controller from fresh metadata 26 // and stores it in the local cache. Requires Kafka 0.10 or higher. 27 RefreshController() (*Broker, error) 28 29 // Brokers returns the current set of active brokers as retrieved from cluster metadata. 30 Brokers() []*Broker 31 32 // Topics returns the set of available topics as retrieved from cluster metadata. 33 Topics() ([]string, error) 34 35 // Partitions returns the sorted list of all partition IDs for the given topic. 36 Partitions(topic string) ([]int32, error) 37 38 // WritablePartitions returns the sorted list of all writable partition IDs for 39 // the given topic, where "writable" means "having a valid leader accepting 40 // writes". 41 WritablePartitions(topic string) ([]int32, error) 42 43 // Leader returns the broker object that is the leader of the current 44 // topic/partition, as determined by querying the cluster metadata. 45 Leader(topic string, partitionID int32) (*Broker, error) 46 47 // Replicas returns the set of all replica IDs for the given partition. 48 Replicas(topic string, partitionID int32) ([]int32, error) 49 50 // InSyncReplicas returns the set of all in-sync replica IDs for the given 51 // partition. In-sync replicas are replicas which are fully caught up with 52 // the partition leader. 53 InSyncReplicas(topic string, partitionID int32) ([]int32, error) 54 55 // OfflineReplicas returns the set of all offline replica IDs for the given 56 // partition. Offline replicas are replicas which are offline 57 OfflineReplicas(topic string, partitionID int32) ([]int32, error) 58 59 // RefreshMetadata takes a list of topics and queries the cluster to refresh the 60 // available metadata for those topics. If no topics are provided, it will refresh 61 // metadata for all topics. 62 RefreshMetadata(topics ...string) error 63 64 // GetOffset queries the cluster to get the most recent available offset at the 65 // given time (in milliseconds) on the topic/partition combination. 66 // Time should be OffsetOldest for the earliest available offset, 67 // OffsetNewest for the offset of the message that will be produced next, or a time. 68 GetOffset(topic string, partitionID int32, time int64) (int64, error) 69 70 // Coordinator returns the coordinating broker for a consumer group. It will 71 // return a locally cached value if it's available. You can call 72 // RefreshCoordinator to update the cached value. This function only works on 73 // Kafka 0.8.2 and higher. 74 Coordinator(consumerGroup string) (*Broker, error) 75 76 // RefreshCoordinator retrieves the coordinator for a consumer group and stores it 77 // in local cache. This function only works on Kafka 0.8.2 and higher. 78 RefreshCoordinator(consumerGroup string) error 79 80 // InitProducerID retrieves information required for Idempotent Producer 81 InitProducerID() (*InitProducerIDResponse, error) 82 83 // Close shuts down all broker connections managed by this client. It is required 84 // to call this function before a client object passes out of scope, as it will 85 // otherwise leak memory. You must close any Producers or Consumers using a client 86 // before you close the client. 87 Close() error 88 89 // Closed returns true if the client has already had Close called on it 90 Closed() bool 91} 92 93const ( 94 // OffsetNewest stands for the log head offset, i.e. the offset that will be 95 // assigned to the next message that will be produced to the partition. You 96 // can send this to a client's GetOffset method to get this offset, or when 97 // calling ConsumePartition to start consuming new messages. 98 OffsetNewest int64 = -1 99 // OffsetOldest stands for the oldest offset available on the broker for a 100 // partition. You can send this to a client's GetOffset method to get this 101 // offset, or when calling ConsumePartition to start consuming from the 102 // oldest offset that is still available on the broker. 103 OffsetOldest int64 = -2 104) 105 106type client struct { 107 conf *Config 108 closer, closed chan none // for shutting down background metadata updater 109 110 // the broker addresses given to us through the constructor are not guaranteed to be returned in 111 // the cluster metadata (I *think* it only returns brokers who are currently leading partitions?) 112 // so we store them separately 113 seedBrokers []*Broker 114 deadSeeds []*Broker 115 116 controllerID int32 // cluster controller broker id 117 brokers map[int32]*Broker // maps broker ids to brokers 118 metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata 119 metadataTopics map[string]none // topics that need to collect metadata 120 coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs 121 122 // If the number of partitions is large, we can get some churn calling cachedPartitions, 123 // so the result is cached. It is important to update this value whenever metadata is changed 124 cachedPartitionsResults map[string][maxPartitionIndex][]int32 125 126 lock sync.RWMutex // protects access to the maps that hold cluster state. 127} 128 129// NewClient creates a new Client. It connects to one of the given broker addresses 130// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot 131// be retrieved from any of the given broker addresses, the client is not created. 132func NewClient(addrs []string, conf *Config) (Client, error) { 133 Logger.Println("Initializing new client") 134 135 if conf == nil { 136 conf = NewConfig() 137 } 138 139 if err := conf.Validate(); err != nil { 140 return nil, err 141 } 142 143 if len(addrs) < 1 { 144 return nil, ConfigurationError("You must provide at least one broker address") 145 } 146 147 client := &client{ 148 conf: conf, 149 closer: make(chan none), 150 closed: make(chan none), 151 brokers: make(map[int32]*Broker), 152 metadata: make(map[string]map[int32]*PartitionMetadata), 153 metadataTopics: make(map[string]none), 154 cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32), 155 coordinators: make(map[string]int32), 156 } 157 158 random := rand.New(rand.NewSource(time.Now().UnixNano())) 159 for _, index := range random.Perm(len(addrs)) { 160 client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index])) 161 } 162 163 if conf.Metadata.Full { 164 // do an initial fetch of all cluster metadata by specifying an empty list of topics 165 err := client.RefreshMetadata() 166 switch err { 167 case nil: 168 break 169 case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed: 170 // indicates that maybe part of the cluster is down, but is not fatal to creating the client 171 Logger.Println(err) 172 default: 173 close(client.closed) // we haven't started the background updater yet, so we have to do this manually 174 _ = client.Close() 175 return nil, err 176 } 177 } 178 go withRecover(client.backgroundMetadataUpdater) 179 180 Logger.Println("Successfully initialized new client") 181 182 return client, nil 183} 184 185func (client *client) Config() *Config { 186 return client.conf 187} 188 189func (client *client) Brokers() []*Broker { 190 client.lock.RLock() 191 defer client.lock.RUnlock() 192 brokers := make([]*Broker, 0, len(client.brokers)) 193 for _, broker := range client.brokers { 194 brokers = append(brokers, broker) 195 } 196 return brokers 197} 198 199func (client *client) InitProducerID() (*InitProducerIDResponse, error) { 200 var err error 201 for broker := client.any(); broker != nil; broker = client.any() { 202 req := &InitProducerIDRequest{} 203 204 response, err := broker.InitProducerID(req) 205 switch err.(type) { 206 case nil: 207 return response, nil 208 default: 209 // some error, remove that broker and try again 210 Logger.Printf("Client got error from broker %d when issuing InitProducerID : %v\n", broker.ID(), err) 211 _ = broker.Close() 212 client.deregisterBroker(broker) 213 } 214 } 215 return nil, err 216} 217 218func (client *client) Close() error { 219 if client.Closed() { 220 // Chances are this is being called from a defer() and the error will go unobserved 221 // so we go ahead and log the event in this case. 222 Logger.Printf("Close() called on already closed client") 223 return ErrClosedClient 224 } 225 226 // shutdown and wait for the background thread before we take the lock, to avoid races 227 close(client.closer) 228 <-client.closed 229 230 client.lock.Lock() 231 defer client.lock.Unlock() 232 Logger.Println("Closing Client") 233 234 for _, broker := range client.brokers { 235 safeAsyncClose(broker) 236 } 237 238 for _, broker := range client.seedBrokers { 239 safeAsyncClose(broker) 240 } 241 242 client.brokers = nil 243 client.metadata = nil 244 client.metadataTopics = nil 245 246 return nil 247} 248 249func (client *client) Closed() bool { 250 client.lock.RLock() 251 defer client.lock.RUnlock() 252 253 return client.brokers == nil 254} 255 256func (client *client) Topics() ([]string, error) { 257 if client.Closed() { 258 return nil, ErrClosedClient 259 } 260 261 client.lock.RLock() 262 defer client.lock.RUnlock() 263 264 ret := make([]string, 0, len(client.metadata)) 265 for topic := range client.metadata { 266 ret = append(ret, topic) 267 } 268 269 return ret, nil 270} 271 272func (client *client) MetadataTopics() ([]string, error) { 273 if client.Closed() { 274 return nil, ErrClosedClient 275 } 276 277 client.lock.RLock() 278 defer client.lock.RUnlock() 279 280 ret := make([]string, 0, len(client.metadataTopics)) 281 for topic := range client.metadataTopics { 282 ret = append(ret, topic) 283 } 284 285 return ret, nil 286} 287 288func (client *client) Partitions(topic string) ([]int32, error) { 289 if client.Closed() { 290 return nil, ErrClosedClient 291 } 292 293 partitions := client.cachedPartitions(topic, allPartitions) 294 295 if len(partitions) == 0 { 296 err := client.RefreshMetadata(topic) 297 if err != nil { 298 return nil, err 299 } 300 partitions = client.cachedPartitions(topic, allPartitions) 301 } 302 303 // no partitions found after refresh metadata 304 if len(partitions) == 0 { 305 return nil, ErrUnknownTopicOrPartition 306 } 307 308 return partitions, nil 309} 310 311func (client *client) WritablePartitions(topic string) ([]int32, error) { 312 if client.Closed() { 313 return nil, ErrClosedClient 314 } 315 316 partitions := client.cachedPartitions(topic, writablePartitions) 317 318 // len==0 catches when it's nil (no such topic) and the odd case when every single 319 // partition is undergoing leader election simultaneously. Callers have to be able to handle 320 // this function returning an empty slice (which is a valid return value) but catching it 321 // here the first time (note we *don't* catch it below where we return ErrUnknownTopicOrPartition) triggers 322 // a metadata refresh as a nicety so callers can just try again and don't have to manually 323 // trigger a refresh (otherwise they'd just keep getting a stale cached copy). 324 if len(partitions) == 0 { 325 err := client.RefreshMetadata(topic) 326 if err != nil { 327 return nil, err 328 } 329 partitions = client.cachedPartitions(topic, writablePartitions) 330 } 331 332 if partitions == nil { 333 return nil, ErrUnknownTopicOrPartition 334 } 335 336 return partitions, nil 337} 338 339func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) { 340 if client.Closed() { 341 return nil, ErrClosedClient 342 } 343 344 metadata := client.cachedMetadata(topic, partitionID) 345 346 if metadata == nil { 347 err := client.RefreshMetadata(topic) 348 if err != nil { 349 return nil, err 350 } 351 metadata = client.cachedMetadata(topic, partitionID) 352 } 353 354 if metadata == nil { 355 return nil, ErrUnknownTopicOrPartition 356 } 357 358 if metadata.Err == ErrReplicaNotAvailable { 359 return dupInt32Slice(metadata.Replicas), metadata.Err 360 } 361 return dupInt32Slice(metadata.Replicas), nil 362} 363 364func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) { 365 if client.Closed() { 366 return nil, ErrClosedClient 367 } 368 369 metadata := client.cachedMetadata(topic, partitionID) 370 371 if metadata == nil { 372 err := client.RefreshMetadata(topic) 373 if err != nil { 374 return nil, err 375 } 376 metadata = client.cachedMetadata(topic, partitionID) 377 } 378 379 if metadata == nil { 380 return nil, ErrUnknownTopicOrPartition 381 } 382 383 if metadata.Err == ErrReplicaNotAvailable { 384 return dupInt32Slice(metadata.Isr), metadata.Err 385 } 386 return dupInt32Slice(metadata.Isr), nil 387} 388 389func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) { 390 if client.Closed() { 391 return nil, ErrClosedClient 392 } 393 394 metadata := client.cachedMetadata(topic, partitionID) 395 396 if metadata == nil { 397 err := client.RefreshMetadata(topic) 398 if err != nil { 399 return nil, err 400 } 401 metadata = client.cachedMetadata(topic, partitionID) 402 } 403 404 if metadata == nil { 405 return nil, ErrUnknownTopicOrPartition 406 } 407 408 if metadata.Err == ErrReplicaNotAvailable { 409 return dupInt32Slice(metadata.OfflineReplicas), metadata.Err 410 } 411 return dupInt32Slice(metadata.OfflineReplicas), nil 412} 413 414func (client *client) Leader(topic string, partitionID int32) (*Broker, error) { 415 if client.Closed() { 416 return nil, ErrClosedClient 417 } 418 419 leader, err := client.cachedLeader(topic, partitionID) 420 421 if leader == nil { 422 err = client.RefreshMetadata(topic) 423 if err != nil { 424 return nil, err 425 } 426 leader, err = client.cachedLeader(topic, partitionID) 427 } 428 429 return leader, err 430} 431 432func (client *client) RefreshMetadata(topics ...string) error { 433 if client.Closed() { 434 return ErrClosedClient 435 } 436 437 // Prior to 0.8.2, Kafka will throw exceptions on an empty topic and not return a proper 438 // error. This handles the case by returning an error instead of sending it 439 // off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310 440 for _, topic := range topics { 441 if len(topic) == 0 { 442 return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return 443 } 444 } 445 446 deadline := time.Time{} 447 if client.conf.Metadata.Timeout > 0 { 448 deadline = time.Now().Add(client.conf.Metadata.Timeout) 449 } 450 return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline) 451} 452 453func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) { 454 if client.Closed() { 455 return -1, ErrClosedClient 456 } 457 458 offset, err := client.getOffset(topic, partitionID, time) 459 460 if err != nil { 461 if err := client.RefreshMetadata(topic); err != nil { 462 return -1, err 463 } 464 return client.getOffset(topic, partitionID, time) 465 } 466 467 return offset, err 468} 469 470func (client *client) Controller() (*Broker, error) { 471 if client.Closed() { 472 return nil, ErrClosedClient 473 } 474 475 if !client.conf.Version.IsAtLeast(V0_10_0_0) { 476 return nil, ErrUnsupportedVersion 477 } 478 479 controller := client.cachedController() 480 if controller == nil { 481 if err := client.refreshMetadata(); err != nil { 482 return nil, err 483 } 484 controller = client.cachedController() 485 } 486 487 if controller == nil { 488 return nil, ErrControllerNotAvailable 489 } 490 491 _ = controller.Open(client.conf) 492 return controller, nil 493} 494 495// deregisterController removes the cached controllerID 496func (client *client) deregisterController() { 497 client.lock.Lock() 498 defer client.lock.Unlock() 499 delete(client.brokers, client.controllerID) 500} 501 502// RefreshController retrieves the cluster controller from fresh metadata 503// and stores it in the local cache. Requires Kafka 0.10 or higher. 504func (client *client) RefreshController() (*Broker, error) { 505 if client.Closed() { 506 return nil, ErrClosedClient 507 } 508 509 client.deregisterController() 510 511 if err := client.refreshMetadata(); err != nil { 512 return nil, err 513 } 514 515 controller := client.cachedController() 516 if controller == nil { 517 return nil, ErrControllerNotAvailable 518 } 519 520 _ = controller.Open(client.conf) 521 return controller, nil 522} 523 524func (client *client) Coordinator(consumerGroup string) (*Broker, error) { 525 if client.Closed() { 526 return nil, ErrClosedClient 527 } 528 529 coordinator := client.cachedCoordinator(consumerGroup) 530 531 if coordinator == nil { 532 if err := client.RefreshCoordinator(consumerGroup); err != nil { 533 return nil, err 534 } 535 coordinator = client.cachedCoordinator(consumerGroup) 536 } 537 538 if coordinator == nil { 539 return nil, ErrConsumerCoordinatorNotAvailable 540 } 541 542 _ = coordinator.Open(client.conf) 543 return coordinator, nil 544} 545 546func (client *client) RefreshCoordinator(consumerGroup string) error { 547 if client.Closed() { 548 return ErrClosedClient 549 } 550 551 response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max) 552 if err != nil { 553 return err 554 } 555 556 client.lock.Lock() 557 defer client.lock.Unlock() 558 client.registerBroker(response.Coordinator) 559 client.coordinators[consumerGroup] = response.Coordinator.ID() 560 return nil 561} 562 563// private broker management helpers 564 565func (client *client) updateBroker(brokers []*Broker) { 566 var currentBroker = make(map[int32]*Broker, len(brokers)) 567 568 for _, broker := range brokers { 569 currentBroker[broker.ID()] = broker 570 if client.brokers[broker.ID()] == nil { // add new broker 571 client.brokers[broker.ID()] = broker 572 Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr()) 573 } else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address 574 safeAsyncClose(client.brokers[broker.ID()]) 575 client.brokers[broker.ID()] = broker 576 Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr()) 577 } 578 } 579 580 for id, broker := range client.brokers { 581 if _, exist := currentBroker[id]; !exist { // remove old broker 582 safeAsyncClose(broker) 583 delete(client.brokers, id) 584 Logger.Printf("client/broker remove invalid broker #%d with %s", broker.ID(), broker.Addr()) 585 } 586 } 587} 588 589// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered 590// in the brokers map. It returns the broker that is registered, which may be the provided broker, 591// or a previously registered Broker instance. You must hold the write lock before calling this function. 592func (client *client) registerBroker(broker *Broker) { 593 if client.brokers == nil { 594 Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr()) 595 return 596 } 597 598 if client.brokers[broker.ID()] == nil { 599 client.brokers[broker.ID()] = broker 600 Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr()) 601 } else if broker.Addr() != client.brokers[broker.ID()].Addr() { 602 safeAsyncClose(client.brokers[broker.ID()]) 603 client.brokers[broker.ID()] = broker 604 Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr()) 605 } 606} 607 608// deregisterBroker removes a broker from the seedsBroker list, and if it's 609// not the seedbroker, removes it from brokers map completely. 610func (client *client) deregisterBroker(broker *Broker) { 611 client.lock.Lock() 612 defer client.lock.Unlock() 613 614 if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] { 615 client.deadSeeds = append(client.deadSeeds, broker) 616 client.seedBrokers = client.seedBrokers[1:] 617 } else { 618 // we do this so that our loop in `tryRefreshMetadata` doesn't go on forever, 619 // but we really shouldn't have to; once that loop is made better this case can be 620 // removed, and the function generally can be renamed from `deregisterBroker` to 621 // `nextSeedBroker` or something 622 Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr()) 623 delete(client.brokers, broker.ID()) 624 } 625} 626 627func (client *client) resurrectDeadBrokers() { 628 client.lock.Lock() 629 defer client.lock.Unlock() 630 631 Logger.Printf("client/brokers resurrecting %d dead seed brokers", len(client.deadSeeds)) 632 client.seedBrokers = append(client.seedBrokers, client.deadSeeds...) 633 client.deadSeeds = nil 634} 635 636func (client *client) any() *Broker { 637 client.lock.RLock() 638 defer client.lock.RUnlock() 639 640 if len(client.seedBrokers) > 0 { 641 _ = client.seedBrokers[0].Open(client.conf) 642 return client.seedBrokers[0] 643 } 644 645 // not guaranteed to be random *or* deterministic 646 for _, broker := range client.brokers { 647 _ = broker.Open(client.conf) 648 return broker 649 } 650 651 return nil 652} 653 654// private caching/lazy metadata helpers 655 656type partitionType int 657 658const ( 659 allPartitions partitionType = iota 660 writablePartitions 661 // If you add any more types, update the partition cache in update() 662 663 // Ensure this is the last partition type value 664 maxPartitionIndex 665) 666 667func (client *client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata { 668 client.lock.RLock() 669 defer client.lock.RUnlock() 670 671 partitions := client.metadata[topic] 672 if partitions != nil { 673 return partitions[partitionID] 674 } 675 676 return nil 677} 678 679func (client *client) cachedPartitions(topic string, partitionSet partitionType) []int32 { 680 client.lock.RLock() 681 defer client.lock.RUnlock() 682 683 partitions, exists := client.cachedPartitionsResults[topic] 684 685 if !exists { 686 return nil 687 } 688 return partitions[partitionSet] 689} 690 691func (client *client) setPartitionCache(topic string, partitionSet partitionType) []int32 { 692 partitions := client.metadata[topic] 693 694 if partitions == nil { 695 return nil 696 } 697 698 ret := make([]int32, 0, len(partitions)) 699 for _, partition := range partitions { 700 if partitionSet == writablePartitions && partition.Err == ErrLeaderNotAvailable { 701 continue 702 } 703 ret = append(ret, partition.ID) 704 } 705 706 sort.Sort(int32Slice(ret)) 707 return ret 708} 709 710func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error) { 711 client.lock.RLock() 712 defer client.lock.RUnlock() 713 714 partitions := client.metadata[topic] 715 if partitions != nil { 716 metadata, ok := partitions[partitionID] 717 if ok { 718 if metadata.Err == ErrLeaderNotAvailable { 719 return nil, ErrLeaderNotAvailable 720 } 721 b := client.brokers[metadata.Leader] 722 if b == nil { 723 return nil, ErrLeaderNotAvailable 724 } 725 _ = b.Open(client.conf) 726 return b, nil 727 } 728 } 729 730 return nil, ErrUnknownTopicOrPartition 731} 732 733func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) { 734 broker, err := client.Leader(topic, partitionID) 735 if err != nil { 736 return -1, err 737 } 738 739 request := &OffsetRequest{} 740 if client.conf.Version.IsAtLeast(V0_10_1_0) { 741 request.Version = 1 742 } 743 request.AddBlock(topic, partitionID, time, 1) 744 745 response, err := broker.GetAvailableOffsets(request) 746 if err != nil { 747 _ = broker.Close() 748 return -1, err 749 } 750 751 block := response.GetBlock(topic, partitionID) 752 if block == nil { 753 _ = broker.Close() 754 return -1, ErrIncompleteResponse 755 } 756 if block.Err != ErrNoError { 757 return -1, block.Err 758 } 759 if len(block.Offsets) != 1 { 760 return -1, ErrOffsetOutOfRange 761 } 762 763 return block.Offsets[0], nil 764} 765 766// core metadata update logic 767 768func (client *client) backgroundMetadataUpdater() { 769 defer close(client.closed) 770 771 if client.conf.Metadata.RefreshFrequency == time.Duration(0) { 772 return 773 } 774 775 ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency) 776 defer ticker.Stop() 777 778 for { 779 select { 780 case <-ticker.C: 781 if err := client.refreshMetadata(); err != nil { 782 Logger.Println("Client background metadata update:", err) 783 } 784 case <-client.closer: 785 return 786 } 787 } 788} 789 790func (client *client) refreshMetadata() error { 791 topics := []string{} 792 793 if !client.conf.Metadata.Full { 794 if specificTopics, err := client.MetadataTopics(); err != nil { 795 return err 796 } else if len(specificTopics) == 0 { 797 return ErrNoTopicsToUpdateMetadata 798 } else { 799 topics = specificTopics 800 } 801 } 802 803 if err := client.RefreshMetadata(topics...); err != nil { 804 return err 805 } 806 807 return nil 808} 809 810func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error { 811 pastDeadline := func(backoff time.Duration) bool { 812 if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) { 813 // we are past the deadline 814 return true 815 } 816 return false 817 } 818 retry := func(err error) error { 819 if attemptsRemaining > 0 { 820 backoff := client.computeBackoff(attemptsRemaining) 821 if pastDeadline(backoff) { 822 Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout") 823 return err 824 } 825 Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining) 826 if backoff > 0 { 827 time.Sleep(backoff) 828 } 829 return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline) 830 } 831 return err 832 } 833 834 broker := client.any() 835 for ; broker != nil && !pastDeadline(0); broker = client.any() { 836 allowAutoTopicCreation := true 837 if len(topics) > 0 { 838 Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr) 839 } else { 840 allowAutoTopicCreation = false 841 Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr) 842 } 843 844 req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation} 845 if client.conf.Version.IsAtLeast(V1_0_0_0) { 846 req.Version = 5 847 } else if client.conf.Version.IsAtLeast(V0_10_0_0) { 848 req.Version = 1 849 } 850 response, err := broker.GetMetadata(req) 851 switch err.(type) { 852 case nil: 853 allKnownMetaData := len(topics) == 0 854 // valid response, use it 855 shouldRetry, err := client.updateMetadata(response, allKnownMetaData) 856 if shouldRetry { 857 Logger.Println("client/metadata found some partitions to be leaderless") 858 return retry(err) // note: err can be nil 859 } 860 return err 861 862 case PacketEncodingError: 863 // didn't even send, return the error 864 return err 865 866 case KError: 867 // if SASL auth error return as this _should_ be a non retryable err for all brokers 868 if err.(KError) == ErrSASLAuthenticationFailed { 869 Logger.Println("client/metadata failed SASL authentication") 870 return err 871 } 872 873 if err.(KError) == ErrTopicAuthorizationFailed { 874 Logger.Println("client is not authorized to access this topic. The topics were: ", topics) 875 return err 876 } 877 // else remove that broker and try again 878 Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err) 879 _ = broker.Close() 880 client.deregisterBroker(broker) 881 882 default: 883 // some other error, remove that broker and try again 884 Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err) 885 _ = broker.Close() 886 client.deregisterBroker(broker) 887 } 888 } 889 890 if broker != nil { 891 Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr) 892 return retry(ErrOutOfBrokers) 893 } 894 895 Logger.Println("client/metadata no available broker to send metadata request to") 896 client.resurrectDeadBrokers() 897 return retry(ErrOutOfBrokers) 898} 899 900// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable 901func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) { 902 if client.Closed() { 903 return 904 } 905 906 client.lock.Lock() 907 defer client.lock.Unlock() 908 909 // For all the brokers we received: 910 // - if it is a new ID, save it 911 // - if it is an existing ID, but the address we have is stale, discard the old one and save it 912 // - if some brokers is not exist in it, remove old broker 913 // - otherwise ignore it, replacing our existing one would just bounce the connection 914 client.updateBroker(data.Brokers) 915 916 client.controllerID = data.ControllerID 917 918 if allKnownMetaData { 919 client.metadata = make(map[string]map[int32]*PartitionMetadata) 920 client.metadataTopics = make(map[string]none) 921 client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32) 922 } 923 for _, topic := range data.Topics { 924 // topics must be added firstly to `metadataTopics` to guarantee that all 925 // requested topics must be recorded to keep them trackable for periodically 926 // metadata refresh. 927 if _, exists := client.metadataTopics[topic.Name]; !exists { 928 client.metadataTopics[topic.Name] = none{} 929 } 930 delete(client.metadata, topic.Name) 931 delete(client.cachedPartitionsResults, topic.Name) 932 933 switch topic.Err { 934 case ErrNoError: 935 // no-op 936 case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results 937 err = topic.Err 938 continue 939 case ErrUnknownTopicOrPartition: // retry, do not store partial partition results 940 err = topic.Err 941 retry = true 942 continue 943 case ErrLeaderNotAvailable: // retry, but store partial partition results 944 retry = true 945 default: // don't retry, don't store partial results 946 Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err) 947 err = topic.Err 948 continue 949 } 950 951 client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions)) 952 for _, partition := range topic.Partitions { 953 client.metadata[topic.Name][partition.ID] = partition 954 if partition.Err == ErrLeaderNotAvailable { 955 retry = true 956 } 957 } 958 959 var partitionCache [maxPartitionIndex][]int32 960 partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions) 961 partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions) 962 client.cachedPartitionsResults[topic.Name] = partitionCache 963 } 964 965 return 966} 967 968func (client *client) cachedCoordinator(consumerGroup string) *Broker { 969 client.lock.RLock() 970 defer client.lock.RUnlock() 971 if coordinatorID, ok := client.coordinators[consumerGroup]; ok { 972 return client.brokers[coordinatorID] 973 } 974 return nil 975} 976 977func (client *client) cachedController() *Broker { 978 client.lock.RLock() 979 defer client.lock.RUnlock() 980 981 return client.brokers[client.controllerID] 982} 983 984func (client *client) computeBackoff(attemptsRemaining int) time.Duration { 985 if client.conf.Metadata.Retry.BackoffFunc != nil { 986 maxRetries := client.conf.Metadata.Retry.Max 987 retries := maxRetries - attemptsRemaining 988 return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries) 989 } 990 return client.conf.Metadata.Retry.Backoff 991} 992 993func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) { 994 retry := func(err error) (*FindCoordinatorResponse, error) { 995 if attemptsRemaining > 0 { 996 backoff := client.computeBackoff(attemptsRemaining) 997 Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining) 998 time.Sleep(backoff) 999 return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1) 1000 } 1001 return nil, err 1002 } 1003 1004 for broker := client.any(); broker != nil; broker = client.any() { 1005 Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr()) 1006 1007 request := new(FindCoordinatorRequest) 1008 request.CoordinatorKey = consumerGroup 1009 request.CoordinatorType = CoordinatorGroup 1010 1011 response, err := broker.FindCoordinator(request) 1012 1013 if err != nil { 1014 Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err) 1015 1016 switch err.(type) { 1017 case PacketEncodingError: 1018 return nil, err 1019 default: 1020 _ = broker.Close() 1021 client.deregisterBroker(broker) 1022 continue 1023 } 1024 } 1025 1026 switch response.Err { 1027 case ErrNoError: 1028 Logger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr()) 1029 return response, nil 1030 1031 case ErrConsumerCoordinatorNotAvailable: 1032 Logger.Printf("client/coordinator coordinator for consumer group %s is not available\n", consumerGroup) 1033 1034 // This is very ugly, but this scenario will only happen once per cluster. 1035 // The __consumer_offsets topic only has to be created one time. 1036 // The number of partitions not configurable, but partition 0 should always exist. 1037 if _, err := client.Leader("__consumer_offsets", 0); err != nil { 1038 Logger.Printf("client/coordinator the __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n") 1039 time.Sleep(2 * time.Second) 1040 } 1041 1042 return retry(ErrConsumerCoordinatorNotAvailable) 1043 case ErrGroupAuthorizationFailed: 1044 Logger.Printf("client was not authorized to access group %s while attempting to find coordinator", consumerGroup) 1045 return retry(ErrGroupAuthorizationFailed) 1046 1047 default: 1048 return nil, response.Err 1049 } 1050 } 1051 1052 Logger.Println("client/coordinator no available broker to send consumer metadata request to") 1053 client.resurrectDeadBrokers() 1054 return retry(ErrOutOfBrokers) 1055} 1056 1057// nopCloserClient embeds an existing Client, but disables 1058// the Close method (yet all other methods pass 1059// through unchanged). This is for use in larger structs 1060// where it is undesirable to close the client that was 1061// passed in by the caller. 1062type nopCloserClient struct { 1063 Client 1064} 1065 1066// Close intercepts and purposely does not call the underlying 1067// client's Close() method. 1068func (ncc *nopCloserClient) Close() error { 1069 return nil 1070} 1071