1package sarama
2
3import (
4	"math/rand"
5	"sort"
6	"sync"
7	"time"
8)
9
10// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
11// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
12// automatically when it passes out of scope. It is safe to share a client amongst many
13// users, however Kafka will process requests from a single client strictly in serial,
14// so it is generally more efficient to use the default one client per producer/consumer.
15type Client interface {
16	// Config returns the Config struct of the client. This struct should not be
17	// altered after it has been created.
18	Config() *Config
19
20	// Controller returns the cluster controller broker. It will return a
21	// locally cached value if it's available. You can call RefreshController
22	// to update the cached value. Requires Kafka 0.10 or higher.
23	Controller() (*Broker, error)
24
25	// RefreshController retrieves the cluster controller from fresh metadata
26	// and stores it in the local cache. Requires Kafka 0.10 or higher.
27	RefreshController() (*Broker, error)
28
29	// Brokers returns the current set of active brokers as retrieved from cluster metadata.
30	Brokers() []*Broker
31
32	// Topics returns the set of available topics as retrieved from cluster metadata.
33	Topics() ([]string, error)
34
35	// Partitions returns the sorted list of all partition IDs for the given topic.
36	Partitions(topic string) ([]int32, error)
37
38	// WritablePartitions returns the sorted list of all writable partition IDs for
39	// the given topic, where "writable" means "having a valid leader accepting
40	// writes".
41	WritablePartitions(topic string) ([]int32, error)
42
43	// Leader returns the broker object that is the leader of the current
44	// topic/partition, as determined by querying the cluster metadata.
45	Leader(topic string, partitionID int32) (*Broker, error)
46
47	// Replicas returns the set of all replica IDs for the given partition.
48	Replicas(topic string, partitionID int32) ([]int32, error)
49
50	// InSyncReplicas returns the set of all in-sync replica IDs for the given
51	// partition. In-sync replicas are replicas which are fully caught up with
52	// the partition leader.
53	InSyncReplicas(topic string, partitionID int32) ([]int32, error)
54
55	// OfflineReplicas returns the set of all offline replica IDs for the given
56	// partition. Offline replicas are replicas which are offline
57	OfflineReplicas(topic string, partitionID int32) ([]int32, error)
58
59	// RefreshMetadata takes a list of topics and queries the cluster to refresh the
60	// available metadata for those topics. If no topics are provided, it will refresh
61	// metadata for all topics.
62	RefreshMetadata(topics ...string) error
63
64	// GetOffset queries the cluster to get the most recent available offset at the
65	// given time (in milliseconds) on the topic/partition combination.
66	// Time should be OffsetOldest for the earliest available offset,
67	// OffsetNewest for the offset of the message that will be produced next, or a time.
68	GetOffset(topic string, partitionID int32, time int64) (int64, error)
69
70	// Coordinator returns the coordinating broker for a consumer group. It will
71	// return a locally cached value if it's available. You can call
72	// RefreshCoordinator to update the cached value. This function only works on
73	// Kafka 0.8.2 and higher.
74	Coordinator(consumerGroup string) (*Broker, error)
75
76	// RefreshCoordinator retrieves the coordinator for a consumer group and stores it
77	// in local cache. This function only works on Kafka 0.8.2 and higher.
78	RefreshCoordinator(consumerGroup string) error
79
80	// InitProducerID retrieves information required for Idempotent Producer
81	InitProducerID() (*InitProducerIDResponse, error)
82
83	// Close shuts down all broker connections managed by this client. It is required
84	// to call this function before a client object passes out of scope, as it will
85	// otherwise leak memory. You must close any Producers or Consumers using a client
86	// before you close the client.
87	Close() error
88
89	// Closed returns true if the client has already had Close called on it
90	Closed() bool
91}
92
93const (
94	// OffsetNewest stands for the log head offset, i.e. the offset that will be
95	// assigned to the next message that will be produced to the partition. You
96	// can send this to a client's GetOffset method to get this offset, or when
97	// calling ConsumePartition to start consuming new messages.
98	OffsetNewest int64 = -1
99	// OffsetOldest stands for the oldest offset available on the broker for a
100	// partition. You can send this to a client's GetOffset method to get this
101	// offset, or when calling ConsumePartition to start consuming from the
102	// oldest offset that is still available on the broker.
103	OffsetOldest int64 = -2
104)
105
106type client struct {
107	conf           *Config
108	closer, closed chan none // for shutting down background metadata updater
109
110	// the broker addresses given to us through the constructor are not guaranteed to be returned in
111	// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
112	// so we store them separately
113	seedBrokers []*Broker
114	deadSeeds   []*Broker
115
116	controllerID   int32                                   // cluster controller broker id
117	brokers        map[int32]*Broker                       // maps broker ids to brokers
118	metadata       map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
119	metadataTopics map[string]none                         // topics that need to collect metadata
120	coordinators   map[string]int32                        // Maps consumer group names to coordinating broker IDs
121
122	// If the number of partitions is large, we can get some churn calling cachedPartitions,
123	// so the result is cached.  It is important to update this value whenever metadata is changed
124	cachedPartitionsResults map[string][maxPartitionIndex][]int32
125
126	lock sync.RWMutex // protects access to the maps that hold cluster state.
127}
128
129// NewClient creates a new Client. It connects to one of the given broker addresses
130// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
131// be retrieved from any of the given broker addresses, the client is not created.
132func NewClient(addrs []string, conf *Config) (Client, error) {
133	Logger.Println("Initializing new client")
134
135	if conf == nil {
136		conf = NewConfig()
137	}
138
139	if err := conf.Validate(); err != nil {
140		return nil, err
141	}
142
143	if len(addrs) < 1 {
144		return nil, ConfigurationError("You must provide at least one broker address")
145	}
146
147	client := &client{
148		conf:                    conf,
149		closer:                  make(chan none),
150		closed:                  make(chan none),
151		brokers:                 make(map[int32]*Broker),
152		metadata:                make(map[string]map[int32]*PartitionMetadata),
153		metadataTopics:          make(map[string]none),
154		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
155		coordinators:            make(map[string]int32),
156	}
157
158	random := rand.New(rand.NewSource(time.Now().UnixNano()))
159	for _, index := range random.Perm(len(addrs)) {
160		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
161	}
162
163	if conf.Metadata.Full {
164		// do an initial fetch of all cluster metadata by specifying an empty list of topics
165		err := client.RefreshMetadata()
166		switch err {
167		case nil:
168			break
169		case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
170			// indicates that maybe part of the cluster is down, but is not fatal to creating the client
171			Logger.Println(err)
172		default:
173			close(client.closed) // we haven't started the background updater yet, so we have to do this manually
174			_ = client.Close()
175			return nil, err
176		}
177	}
178	go withRecover(client.backgroundMetadataUpdater)
179
180	Logger.Println("Successfully initialized new client")
181
182	return client, nil
183}
184
185func (client *client) Config() *Config {
186	return client.conf
187}
188
189func (client *client) Brokers() []*Broker {
190	client.lock.RLock()
191	defer client.lock.RUnlock()
192	brokers := make([]*Broker, 0, len(client.brokers))
193	for _, broker := range client.brokers {
194		brokers = append(brokers, broker)
195	}
196	return brokers
197}
198
199func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
200	var err error
201	for broker := client.any(); broker != nil; broker = client.any() {
202		req := &InitProducerIDRequest{}
203
204		response, err := broker.InitProducerID(req)
205		switch err.(type) {
206		case nil:
207			return response, nil
208		default:
209			// some error, remove that broker and try again
210			Logger.Printf("Client got error from broker %d when issuing InitProducerID : %v\n", broker.ID(), err)
211			_ = broker.Close()
212			client.deregisterBroker(broker)
213		}
214	}
215	return nil, err
216}
217
218func (client *client) Close() error {
219	if client.Closed() {
220		// Chances are this is being called from a defer() and the error will go unobserved
221		// so we go ahead and log the event in this case.
222		Logger.Printf("Close() called on already closed client")
223		return ErrClosedClient
224	}
225
226	// shutdown and wait for the background thread before we take the lock, to avoid races
227	close(client.closer)
228	<-client.closed
229
230	client.lock.Lock()
231	defer client.lock.Unlock()
232	Logger.Println("Closing Client")
233
234	for _, broker := range client.brokers {
235		safeAsyncClose(broker)
236	}
237
238	for _, broker := range client.seedBrokers {
239		safeAsyncClose(broker)
240	}
241
242	client.brokers = nil
243	client.metadata = nil
244	client.metadataTopics = nil
245
246	return nil
247}
248
249func (client *client) Closed() bool {
250	client.lock.RLock()
251	defer client.lock.RUnlock()
252
253	return client.brokers == nil
254}
255
256func (client *client) Topics() ([]string, error) {
257	if client.Closed() {
258		return nil, ErrClosedClient
259	}
260
261	client.lock.RLock()
262	defer client.lock.RUnlock()
263
264	ret := make([]string, 0, len(client.metadata))
265	for topic := range client.metadata {
266		ret = append(ret, topic)
267	}
268
269	return ret, nil
270}
271
272func (client *client) MetadataTopics() ([]string, error) {
273	if client.Closed() {
274		return nil, ErrClosedClient
275	}
276
277	client.lock.RLock()
278	defer client.lock.RUnlock()
279
280	ret := make([]string, 0, len(client.metadataTopics))
281	for topic := range client.metadataTopics {
282		ret = append(ret, topic)
283	}
284
285	return ret, nil
286}
287
288func (client *client) Partitions(topic string) ([]int32, error) {
289	if client.Closed() {
290		return nil, ErrClosedClient
291	}
292
293	partitions := client.cachedPartitions(topic, allPartitions)
294
295	if len(partitions) == 0 {
296		err := client.RefreshMetadata(topic)
297		if err != nil {
298			return nil, err
299		}
300		partitions = client.cachedPartitions(topic, allPartitions)
301	}
302
303	// no partitions found after refresh metadata
304	if len(partitions) == 0 {
305		return nil, ErrUnknownTopicOrPartition
306	}
307
308	return partitions, nil
309}
310
311func (client *client) WritablePartitions(topic string) ([]int32, error) {
312	if client.Closed() {
313		return nil, ErrClosedClient
314	}
315
316	partitions := client.cachedPartitions(topic, writablePartitions)
317
318	// len==0 catches when it's nil (no such topic) and the odd case when every single
319	// partition is undergoing leader election simultaneously. Callers have to be able to handle
320	// this function returning an empty slice (which is a valid return value) but catching it
321	// here the first time (note we *don't* catch it below where we return ErrUnknownTopicOrPartition) triggers
322	// a metadata refresh as a nicety so callers can just try again and don't have to manually
323	// trigger a refresh (otherwise they'd just keep getting a stale cached copy).
324	if len(partitions) == 0 {
325		err := client.RefreshMetadata(topic)
326		if err != nil {
327			return nil, err
328		}
329		partitions = client.cachedPartitions(topic, writablePartitions)
330	}
331
332	if partitions == nil {
333		return nil, ErrUnknownTopicOrPartition
334	}
335
336	return partitions, nil
337}
338
339func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) {
340	if client.Closed() {
341		return nil, ErrClosedClient
342	}
343
344	metadata := client.cachedMetadata(topic, partitionID)
345
346	if metadata == nil {
347		err := client.RefreshMetadata(topic)
348		if err != nil {
349			return nil, err
350		}
351		metadata = client.cachedMetadata(topic, partitionID)
352	}
353
354	if metadata == nil {
355		return nil, ErrUnknownTopicOrPartition
356	}
357
358	if metadata.Err == ErrReplicaNotAvailable {
359		return dupInt32Slice(metadata.Replicas), metadata.Err
360	}
361	return dupInt32Slice(metadata.Replicas), nil
362}
363
364func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
365	if client.Closed() {
366		return nil, ErrClosedClient
367	}
368
369	metadata := client.cachedMetadata(topic, partitionID)
370
371	if metadata == nil {
372		err := client.RefreshMetadata(topic)
373		if err != nil {
374			return nil, err
375		}
376		metadata = client.cachedMetadata(topic, partitionID)
377	}
378
379	if metadata == nil {
380		return nil, ErrUnknownTopicOrPartition
381	}
382
383	if metadata.Err == ErrReplicaNotAvailable {
384		return dupInt32Slice(metadata.Isr), metadata.Err
385	}
386	return dupInt32Slice(metadata.Isr), nil
387}
388
389func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) {
390	if client.Closed() {
391		return nil, ErrClosedClient
392	}
393
394	metadata := client.cachedMetadata(topic, partitionID)
395
396	if metadata == nil {
397		err := client.RefreshMetadata(topic)
398		if err != nil {
399			return nil, err
400		}
401		metadata = client.cachedMetadata(topic, partitionID)
402	}
403
404	if metadata == nil {
405		return nil, ErrUnknownTopicOrPartition
406	}
407
408	if metadata.Err == ErrReplicaNotAvailable {
409		return dupInt32Slice(metadata.OfflineReplicas), metadata.Err
410	}
411	return dupInt32Slice(metadata.OfflineReplicas), nil
412}
413
414func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
415	if client.Closed() {
416		return nil, ErrClosedClient
417	}
418
419	leader, err := client.cachedLeader(topic, partitionID)
420
421	if leader == nil {
422		err = client.RefreshMetadata(topic)
423		if err != nil {
424			return nil, err
425		}
426		leader, err = client.cachedLeader(topic, partitionID)
427	}
428
429	return leader, err
430}
431
432func (client *client) RefreshMetadata(topics ...string) error {
433	if client.Closed() {
434		return ErrClosedClient
435	}
436
437	// Prior to 0.8.2, Kafka will throw exceptions on an empty topic and not return a proper
438	// error. This handles the case by returning an error instead of sending it
439	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
440	for _, topic := range topics {
441		if len(topic) == 0 {
442			return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
443		}
444	}
445
446	deadline := time.Time{}
447	if client.conf.Metadata.Timeout > 0 {
448		deadline = time.Now().Add(client.conf.Metadata.Timeout)
449	}
450	return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
451}
452
453func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
454	if client.Closed() {
455		return -1, ErrClosedClient
456	}
457
458	offset, err := client.getOffset(topic, partitionID, time)
459
460	if err != nil {
461		if err := client.RefreshMetadata(topic); err != nil {
462			return -1, err
463		}
464		return client.getOffset(topic, partitionID, time)
465	}
466
467	return offset, err
468}
469
470func (client *client) Controller() (*Broker, error) {
471	if client.Closed() {
472		return nil, ErrClosedClient
473	}
474
475	if !client.conf.Version.IsAtLeast(V0_10_0_0) {
476		return nil, ErrUnsupportedVersion
477	}
478
479	controller := client.cachedController()
480	if controller == nil {
481		if err := client.refreshMetadata(); err != nil {
482			return nil, err
483		}
484		controller = client.cachedController()
485	}
486
487	if controller == nil {
488		return nil, ErrControllerNotAvailable
489	}
490
491	_ = controller.Open(client.conf)
492	return controller, nil
493}
494
495// deregisterController removes the cached controllerID
496func (client *client) deregisterController() {
497	client.lock.Lock()
498	defer client.lock.Unlock()
499	delete(client.brokers, client.controllerID)
500}
501
502// RefreshController retrieves the cluster controller from fresh metadata
503// and stores it in the local cache. Requires Kafka 0.10 or higher.
504func (client *client) RefreshController() (*Broker, error) {
505	if client.Closed() {
506		return nil, ErrClosedClient
507	}
508
509	client.deregisterController()
510
511	if err := client.refreshMetadata(); err != nil {
512		return nil, err
513	}
514
515	controller := client.cachedController()
516	if controller == nil {
517		return nil, ErrControllerNotAvailable
518	}
519
520	_ = controller.Open(client.conf)
521	return controller, nil
522}
523
524func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
525	if client.Closed() {
526		return nil, ErrClosedClient
527	}
528
529	coordinator := client.cachedCoordinator(consumerGroup)
530
531	if coordinator == nil {
532		if err := client.RefreshCoordinator(consumerGroup); err != nil {
533			return nil, err
534		}
535		coordinator = client.cachedCoordinator(consumerGroup)
536	}
537
538	if coordinator == nil {
539		return nil, ErrConsumerCoordinatorNotAvailable
540	}
541
542	_ = coordinator.Open(client.conf)
543	return coordinator, nil
544}
545
546func (client *client) RefreshCoordinator(consumerGroup string) error {
547	if client.Closed() {
548		return ErrClosedClient
549	}
550
551	response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max)
552	if err != nil {
553		return err
554	}
555
556	client.lock.Lock()
557	defer client.lock.Unlock()
558	client.registerBroker(response.Coordinator)
559	client.coordinators[consumerGroup] = response.Coordinator.ID()
560	return nil
561}
562
563// private broker management helpers
564
565func (client *client) updateBroker(brokers []*Broker) {
566	var currentBroker = make(map[int32]*Broker, len(brokers))
567
568	for _, broker := range brokers {
569		currentBroker[broker.ID()] = broker
570		if client.brokers[broker.ID()] == nil { // add new broker
571			client.brokers[broker.ID()] = broker
572			Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
573		} else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address
574			safeAsyncClose(client.brokers[broker.ID()])
575			client.brokers[broker.ID()] = broker
576			Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
577		}
578	}
579
580	for id, broker := range client.brokers {
581		if _, exist := currentBroker[id]; !exist { // remove old broker
582			safeAsyncClose(broker)
583			delete(client.brokers, id)
584			Logger.Printf("client/broker remove invalid broker #%d with %s", broker.ID(), broker.Addr())
585		}
586	}
587}
588
589// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
590// in the brokers map. It returns the broker that is registered, which may be the provided broker,
591// or a previously registered Broker instance. You must hold the write lock before calling this function.
592func (client *client) registerBroker(broker *Broker) {
593	if client.brokers == nil {
594		Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr())
595		return
596	}
597
598	if client.brokers[broker.ID()] == nil {
599		client.brokers[broker.ID()] = broker
600		Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
601	} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
602		safeAsyncClose(client.brokers[broker.ID()])
603		client.brokers[broker.ID()] = broker
604		Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
605	}
606}
607
608// deregisterBroker removes a broker from the seedsBroker list, and if it's
609// not the seedbroker, removes it from brokers map completely.
610func (client *client) deregisterBroker(broker *Broker) {
611	client.lock.Lock()
612	defer client.lock.Unlock()
613
614	if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
615		client.deadSeeds = append(client.deadSeeds, broker)
616		client.seedBrokers = client.seedBrokers[1:]
617	} else {
618		// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
619		// but we really shouldn't have to; once that loop is made better this case can be
620		// removed, and the function generally can be renamed from `deregisterBroker` to
621		// `nextSeedBroker` or something
622		Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
623		delete(client.brokers, broker.ID())
624	}
625}
626
627func (client *client) resurrectDeadBrokers() {
628	client.lock.Lock()
629	defer client.lock.Unlock()
630
631	Logger.Printf("client/brokers resurrecting %d dead seed brokers", len(client.deadSeeds))
632	client.seedBrokers = append(client.seedBrokers, client.deadSeeds...)
633	client.deadSeeds = nil
634}
635
636func (client *client) any() *Broker {
637	client.lock.RLock()
638	defer client.lock.RUnlock()
639
640	if len(client.seedBrokers) > 0 {
641		_ = client.seedBrokers[0].Open(client.conf)
642		return client.seedBrokers[0]
643	}
644
645	// not guaranteed to be random *or* deterministic
646	for _, broker := range client.brokers {
647		_ = broker.Open(client.conf)
648		return broker
649	}
650
651	return nil
652}
653
654// private caching/lazy metadata helpers
655
656type partitionType int
657
658const (
659	allPartitions partitionType = iota
660	writablePartitions
661	// If you add any more types, update the partition cache in update()
662
663	// Ensure this is the last partition type value
664	maxPartitionIndex
665)
666
667func (client *client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
668	client.lock.RLock()
669	defer client.lock.RUnlock()
670
671	partitions := client.metadata[topic]
672	if partitions != nil {
673		return partitions[partitionID]
674	}
675
676	return nil
677}
678
679func (client *client) cachedPartitions(topic string, partitionSet partitionType) []int32 {
680	client.lock.RLock()
681	defer client.lock.RUnlock()
682
683	partitions, exists := client.cachedPartitionsResults[topic]
684
685	if !exists {
686		return nil
687	}
688	return partitions[partitionSet]
689}
690
691func (client *client) setPartitionCache(topic string, partitionSet partitionType) []int32 {
692	partitions := client.metadata[topic]
693
694	if partitions == nil {
695		return nil
696	}
697
698	ret := make([]int32, 0, len(partitions))
699	for _, partition := range partitions {
700		if partitionSet == writablePartitions && partition.Err == ErrLeaderNotAvailable {
701			continue
702		}
703		ret = append(ret, partition.ID)
704	}
705
706	sort.Sort(int32Slice(ret))
707	return ret
708}
709
710func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error) {
711	client.lock.RLock()
712	defer client.lock.RUnlock()
713
714	partitions := client.metadata[topic]
715	if partitions != nil {
716		metadata, ok := partitions[partitionID]
717		if ok {
718			if metadata.Err == ErrLeaderNotAvailable {
719				return nil, ErrLeaderNotAvailable
720			}
721			b := client.brokers[metadata.Leader]
722			if b == nil {
723				return nil, ErrLeaderNotAvailable
724			}
725			_ = b.Open(client.conf)
726			return b, nil
727		}
728	}
729
730	return nil, ErrUnknownTopicOrPartition
731}
732
733func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {
734	broker, err := client.Leader(topic, partitionID)
735	if err != nil {
736		return -1, err
737	}
738
739	request := &OffsetRequest{}
740	if client.conf.Version.IsAtLeast(V0_10_1_0) {
741		request.Version = 1
742	}
743	request.AddBlock(topic, partitionID, time, 1)
744
745	response, err := broker.GetAvailableOffsets(request)
746	if err != nil {
747		_ = broker.Close()
748		return -1, err
749	}
750
751	block := response.GetBlock(topic, partitionID)
752	if block == nil {
753		_ = broker.Close()
754		return -1, ErrIncompleteResponse
755	}
756	if block.Err != ErrNoError {
757		return -1, block.Err
758	}
759	if len(block.Offsets) != 1 {
760		return -1, ErrOffsetOutOfRange
761	}
762
763	return block.Offsets[0], nil
764}
765
766// core metadata update logic
767
768func (client *client) backgroundMetadataUpdater() {
769	defer close(client.closed)
770
771	if client.conf.Metadata.RefreshFrequency == time.Duration(0) {
772		return
773	}
774
775	ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
776	defer ticker.Stop()
777
778	for {
779		select {
780		case <-ticker.C:
781			if err := client.refreshMetadata(); err != nil {
782				Logger.Println("Client background metadata update:", err)
783			}
784		case <-client.closer:
785			return
786		}
787	}
788}
789
790func (client *client) refreshMetadata() error {
791	topics := []string{}
792
793	if !client.conf.Metadata.Full {
794		if specificTopics, err := client.MetadataTopics(); err != nil {
795			return err
796		} else if len(specificTopics) == 0 {
797			return ErrNoTopicsToUpdateMetadata
798		} else {
799			topics = specificTopics
800		}
801	}
802
803	if err := client.RefreshMetadata(topics...); err != nil {
804		return err
805	}
806
807	return nil
808}
809
810func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
811	pastDeadline := func(backoff time.Duration) bool {
812		if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
813			// we are past the deadline
814			return true
815		}
816		return false
817	}
818	retry := func(err error) error {
819		if attemptsRemaining > 0 {
820			backoff := client.computeBackoff(attemptsRemaining)
821			if pastDeadline(backoff) {
822				Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
823				return err
824			}
825			Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
826			if backoff > 0 {
827				time.Sleep(backoff)
828			}
829			return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
830		}
831		return err
832	}
833
834	broker := client.any()
835	for ; broker != nil && !pastDeadline(0); broker = client.any() {
836		allowAutoTopicCreation := true
837		if len(topics) > 0 {
838			Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
839		} else {
840			allowAutoTopicCreation = false
841			Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
842		}
843
844		req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
845		if client.conf.Version.IsAtLeast(V1_0_0_0) {
846			req.Version = 5
847		} else if client.conf.Version.IsAtLeast(V0_10_0_0) {
848			req.Version = 1
849		}
850		response, err := broker.GetMetadata(req)
851		switch err.(type) {
852		case nil:
853			allKnownMetaData := len(topics) == 0
854			// valid response, use it
855			shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
856			if shouldRetry {
857				Logger.Println("client/metadata found some partitions to be leaderless")
858				return retry(err) // note: err can be nil
859			}
860			return err
861
862		case PacketEncodingError:
863			// didn't even send, return the error
864			return err
865
866		case KError:
867			// if SASL auth error return as this _should_ be a non retryable err for all brokers
868			if err.(KError) == ErrSASLAuthenticationFailed {
869				Logger.Println("client/metadata failed SASL authentication")
870				return err
871			}
872
873			if err.(KError) == ErrTopicAuthorizationFailed {
874				Logger.Println("client is not authorized to access this topic. The topics were: ", topics)
875				return err
876			}
877			// else remove that broker and try again
878			Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
879			_ = broker.Close()
880			client.deregisterBroker(broker)
881
882		default:
883			// some other error, remove that broker and try again
884			Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
885			_ = broker.Close()
886			client.deregisterBroker(broker)
887		}
888	}
889
890	if broker != nil {
891		Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
892		return retry(ErrOutOfBrokers)
893	}
894
895	Logger.Println("client/metadata no available broker to send metadata request to")
896	client.resurrectDeadBrokers()
897	return retry(ErrOutOfBrokers)
898}
899
900// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
901func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
902	if client.Closed() {
903		return
904	}
905
906	client.lock.Lock()
907	defer client.lock.Unlock()
908
909	// For all the brokers we received:
910	// - if it is a new ID, save it
911	// - if it is an existing ID, but the address we have is stale, discard the old one and save it
912	// - if some brokers is not exist in it, remove old broker
913	// - otherwise ignore it, replacing our existing one would just bounce the connection
914	client.updateBroker(data.Brokers)
915
916	client.controllerID = data.ControllerID
917
918	if allKnownMetaData {
919		client.metadata = make(map[string]map[int32]*PartitionMetadata)
920		client.metadataTopics = make(map[string]none)
921		client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
922	}
923	for _, topic := range data.Topics {
924		// topics must be added firstly to `metadataTopics` to guarantee that all
925		// requested topics must be recorded to keep them trackable for periodically
926		// metadata refresh.
927		if _, exists := client.metadataTopics[topic.Name]; !exists {
928			client.metadataTopics[topic.Name] = none{}
929		}
930		delete(client.metadata, topic.Name)
931		delete(client.cachedPartitionsResults, topic.Name)
932
933		switch topic.Err {
934		case ErrNoError:
935			// no-op
936		case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
937			err = topic.Err
938			continue
939		case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
940			err = topic.Err
941			retry = true
942			continue
943		case ErrLeaderNotAvailable: // retry, but store partial partition results
944			retry = true
945		default: // don't retry, don't store partial results
946			Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
947			err = topic.Err
948			continue
949		}
950
951		client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
952		for _, partition := range topic.Partitions {
953			client.metadata[topic.Name][partition.ID] = partition
954			if partition.Err == ErrLeaderNotAvailable {
955				retry = true
956			}
957		}
958
959		var partitionCache [maxPartitionIndex][]int32
960		partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
961		partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)
962		client.cachedPartitionsResults[topic.Name] = partitionCache
963	}
964
965	return
966}
967
968func (client *client) cachedCoordinator(consumerGroup string) *Broker {
969	client.lock.RLock()
970	defer client.lock.RUnlock()
971	if coordinatorID, ok := client.coordinators[consumerGroup]; ok {
972		return client.brokers[coordinatorID]
973	}
974	return nil
975}
976
977func (client *client) cachedController() *Broker {
978	client.lock.RLock()
979	defer client.lock.RUnlock()
980
981	return client.brokers[client.controllerID]
982}
983
984func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
985	if client.conf.Metadata.Retry.BackoffFunc != nil {
986		maxRetries := client.conf.Metadata.Retry.Max
987		retries := maxRetries - attemptsRemaining
988		return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
989	}
990	return client.conf.Metadata.Retry.Backoff
991}
992
993func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
994	retry := func(err error) (*FindCoordinatorResponse, error) {
995		if attemptsRemaining > 0 {
996			backoff := client.computeBackoff(attemptsRemaining)
997			Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
998			time.Sleep(backoff)
999			return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1)
1000		}
1001		return nil, err
1002	}
1003
1004	for broker := client.any(); broker != nil; broker = client.any() {
1005		Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())
1006
1007		request := new(FindCoordinatorRequest)
1008		request.CoordinatorKey = consumerGroup
1009		request.CoordinatorType = CoordinatorGroup
1010
1011		response, err := broker.FindCoordinator(request)
1012
1013		if err != nil {
1014			Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
1015
1016			switch err.(type) {
1017			case PacketEncodingError:
1018				return nil, err
1019			default:
1020				_ = broker.Close()
1021				client.deregisterBroker(broker)
1022				continue
1023			}
1024		}
1025
1026		switch response.Err {
1027		case ErrNoError:
1028			Logger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr())
1029			return response, nil
1030
1031		case ErrConsumerCoordinatorNotAvailable:
1032			Logger.Printf("client/coordinator coordinator for consumer group %s is not available\n", consumerGroup)
1033
1034			// This is very ugly, but this scenario will only happen once per cluster.
1035			// The __consumer_offsets topic only has to be created one time.
1036			// The number of partitions not configurable, but partition 0 should always exist.
1037			if _, err := client.Leader("__consumer_offsets", 0); err != nil {
1038				Logger.Printf("client/coordinator the __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
1039				time.Sleep(2 * time.Second)
1040			}
1041
1042			return retry(ErrConsumerCoordinatorNotAvailable)
1043		case ErrGroupAuthorizationFailed:
1044			Logger.Printf("client was not authorized to access group %s while attempting to find coordinator", consumerGroup)
1045			return retry(ErrGroupAuthorizationFailed)
1046
1047		default:
1048			return nil, response.Err
1049		}
1050	}
1051
1052	Logger.Println("client/coordinator no available broker to send consumer metadata request to")
1053	client.resurrectDeadBrokers()
1054	return retry(ErrOutOfBrokers)
1055}
1056
1057// nopCloserClient embeds an existing Client, but disables
1058// the Close method (yet all other methods pass
1059// through unchanged). This is for use in larger structs
1060// where it is undesirable to close the client that was
1061// passed in by the caller.
1062type nopCloserClient struct {
1063	Client
1064}
1065
1066// Close intercepts and purposely does not call the underlying
1067// client's Close() method.
1068func (ncc *nopCloserClient) Close() error {
1069	return nil
1070}
1071