1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package query
5
6import (
7	"context"
8	"encoding/json"
9	"fmt"
10	"math"
11	"sort"
12	"sync"
13	"time"
14
15	"github.com/go-kit/kit/log"
16	"github.com/go-kit/kit/log/level"
17	"github.com/pkg/errors"
18	"github.com/prometheus/client_golang/prometheus"
19	"github.com/prometheus/prometheus/pkg/labels"
20	"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
21	"google.golang.org/grpc"
22
23	"github.com/thanos-io/thanos/pkg/component"
24	"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
25	"github.com/thanos-io/thanos/pkg/rules/rulespb"
26	"github.com/thanos-io/thanos/pkg/runutil"
27	"github.com/thanos-io/thanos/pkg/store"
28	"github.com/thanos-io/thanos/pkg/store/labelpb"
29	"github.com/thanos-io/thanos/pkg/store/storepb"
30	"github.com/thanos-io/thanos/pkg/targets/targetspb"
31)
32
33const (
34	unhealthyStoreMessage = "removing store because it's unhealthy or does not exist"
35)
36
37type StoreSpec interface {
38	// Addr returns StoreAPI Address for the store spec. It is used as ID for store.
39	Addr() string
40	// Metadata returns current labels, store type and min, max ranges for store.
41	// It can change for every call for this method.
42	// If metadata call fails we assume that store is no longer accessible and we should not use it.
43	// NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage
44	// given store connection.
45	Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []labels.Labels, mint int64, maxt int64, storeType component.StoreAPI, err error)
46
47	// StrictStatic returns true if the StoreAPI has been statically defined and it is under a strict mode.
48	StrictStatic() bool
49}
50
51type RuleSpec interface {
52	// Addr returns RulesAPI Address for the rules spec. It is used as its ID.
53	Addr() string
54}
55
56type TargetSpec interface {
57	// Addr returns TargetsAPI Address for the targets spec. It is used as its ID.
58	Addr() string
59}
60
61type MetadataSpec interface {
62	// Addr returns MetadataAPI Address for the metadata spec. It is used as its ID.
63	Addr() string
64}
65
66type ExemplarSpec interface {
67	// Addr returns ExemplarsAPI Address for the exemplars spec. It is used as its ID.
68	Addr() string
69}
70
71// stringError forces the error to be a string
72// when marshaled into a JSON.
73type stringError struct {
74	originalErr error
75}
76
77// MarshalJSON marshals the error into a string form.
78func (e *stringError) MarshalJSON() ([]byte, error) {
79	return json.Marshal(e.originalErr.Error())
80}
81
82// Error returns the original underlying error.
83func (e *stringError) Error() string {
84	return e.originalErr.Error()
85}
86
87type StoreStatus struct {
88	Name      string             `json:"name"`
89	LastCheck time.Time          `json:"lastCheck"`
90	LastError *stringError       `json:"lastError"`
91	LabelSets []labels.Labels    `json:"labelSets"`
92	StoreType component.StoreAPI `json:"-"`
93	MinTime   int64              `json:"minTime"`
94	MaxTime   int64              `json:"maxTime"`
95}
96
97type grpcStoreSpec struct {
98	addr         string
99	strictstatic bool
100}
101
102// NewGRPCStoreSpec creates store pure gRPC spec.
103// It uses Info gRPC call to get Metadata.
104func NewGRPCStoreSpec(addr string, strictstatic bool) StoreSpec {
105	return &grpcStoreSpec{addr: addr, strictstatic: strictstatic}
106}
107
108// StrictStatic returns true if the StoreAPI has been statically defined and it is under a strict mode.
109func (s *grpcStoreSpec) StrictStatic() bool {
110	return s.strictstatic
111}
112
113func (s *grpcStoreSpec) Addr() string {
114	// API addr should not change between state changes.
115	return s.addr
116}
117
118// Metadata method for gRPC store API tries to reach host Info method until context timeout. If we are unable to get metadata after
119// that time, we assume that the host is unhealthy and return error.
120func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []labels.Labels, mint int64, maxt int64, Type component.StoreAPI, err error) {
121	resp, err := client.Info(ctx, &storepb.InfoRequest{}, grpc.WaitForReady(true))
122	if err != nil {
123		return nil, 0, 0, nil, errors.Wrapf(err, "fetching store info from %s", s.addr)
124	}
125	if len(resp.LabelSets) == 0 && len(resp.Labels) > 0 {
126		resp.LabelSets = []labelpb.ZLabelSet{{Labels: resp.Labels}}
127	}
128
129	labelSets = make([]labels.Labels, 0, len(resp.LabelSets))
130	for _, ls := range resp.LabelSets {
131		labelSets = append(labelSets, ls.PromLabels())
132	}
133	return labelSets, resp.MinTime, resp.MaxTime, component.FromProto(resp.StoreType), nil
134}
135
136// storeSetNodeCollector is a metric collector reporting the number of available storeAPIs for Querier.
137// A Collector is required as we want atomic updates for all 'thanos_store_nodes_grpc_connections' series.
138type storeSetNodeCollector struct {
139	mtx             sync.Mutex
140	storeNodes      map[component.StoreAPI]map[string]int
141	storePerExtLset map[string]int
142
143	connectionsDesc *prometheus.Desc
144}
145
146func newStoreSetNodeCollector() *storeSetNodeCollector {
147	return &storeSetNodeCollector{
148		storeNodes: map[component.StoreAPI]map[string]int{},
149		connectionsDesc: prometheus.NewDesc(
150			"thanos_store_nodes_grpc_connections",
151			"Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.",
152			[]string{"external_labels", "store_type"}, nil,
153		),
154	}
155}
156
157func (c *storeSetNodeCollector) Update(nodes map[component.StoreAPI]map[string]int) {
158	storeNodes := make(map[component.StoreAPI]map[string]int, len(nodes))
159	storePerExtLset := map[string]int{}
160
161	for k, v := range nodes {
162		storeNodes[k] = make(map[string]int, len(v))
163		for kk, vv := range v {
164			storePerExtLset[kk] += vv
165			storeNodes[k][kk] = vv
166		}
167	}
168
169	c.mtx.Lock()
170	defer c.mtx.Unlock()
171	c.storeNodes = storeNodes
172	c.storePerExtLset = storePerExtLset
173}
174
175func (c *storeSetNodeCollector) Describe(ch chan<- *prometheus.Desc) {
176	ch <- c.connectionsDesc
177}
178
179func (c *storeSetNodeCollector) Collect(ch chan<- prometheus.Metric) {
180	c.mtx.Lock()
181	defer c.mtx.Unlock()
182
183	for storeType, occurrencesPerExtLset := range c.storeNodes {
184		for externalLabels, occurrences := range occurrencesPerExtLset {
185			var storeTypeStr string
186			if storeType != nil {
187				storeTypeStr = storeType.String()
188			}
189			ch <- prometheus.MustNewConstMetric(c.connectionsDesc, prometheus.GaugeValue, float64(occurrences), externalLabels, storeTypeStr)
190		}
191	}
192}
193
194// StoreSet maintains a set of active stores. It is backed up by Store Specifications that are dynamically fetched on
195// every Update() call.
196type StoreSet struct {
197	logger log.Logger
198
199	// Store specifications can change dynamically. If some store is missing from the list, we assuming it is no longer
200	// accessible and we close gRPC client for it.
201	storeSpecs          func() []StoreSpec
202	ruleSpecs           func() []RuleSpec
203	targetSpecs         func() []TargetSpec
204	metadataSpecs       func() []MetadataSpec
205	exemplarSpecs       func() []ExemplarSpec
206	dialOpts            []grpc.DialOption
207	gRPCInfoCallTimeout time.Duration
208
209	updateMtx         sync.Mutex
210	storesMtx         sync.RWMutex
211	storesStatusesMtx sync.RWMutex
212
213	// Main map of stores currently used for fanout.
214	stores       map[string]*storeRef
215	storesMetric *storeSetNodeCollector
216
217	// Map of statuses used only by UI.
218	storeStatuses         map[string]*StoreStatus
219	unhealthyStoreTimeout time.Duration
220}
221
222// NewStoreSet returns a new set of store APIs and potentially Rules APIs from given specs.
223func NewStoreSet(
224	logger log.Logger,
225	reg *prometheus.Registry,
226	storeSpecs func() []StoreSpec,
227	ruleSpecs func() []RuleSpec,
228	targetSpecs func() []TargetSpec,
229	metadataSpecs func() []MetadataSpec,
230	exemplarSpecs func() []ExemplarSpec,
231	dialOpts []grpc.DialOption,
232	unhealthyStoreTimeout time.Duration,
233) *StoreSet {
234	storesMetric := newStoreSetNodeCollector()
235	if reg != nil {
236		reg.MustRegister(storesMetric)
237	}
238
239	if logger == nil {
240		logger = log.NewNopLogger()
241	}
242	if storeSpecs == nil {
243		storeSpecs = func() []StoreSpec { return nil }
244	}
245	if ruleSpecs == nil {
246		ruleSpecs = func() []RuleSpec { return nil }
247	}
248	if targetSpecs == nil {
249		targetSpecs = func() []TargetSpec { return nil }
250	}
251	if metadataSpecs == nil {
252		metadataSpecs = func() []MetadataSpec { return nil }
253	}
254	if exemplarSpecs == nil {
255		exemplarSpecs = func() []ExemplarSpec { return nil }
256	}
257
258	ss := &StoreSet{
259		logger:                log.With(logger, "component", "storeset"),
260		storeSpecs:            storeSpecs,
261		ruleSpecs:             ruleSpecs,
262		targetSpecs:           targetSpecs,
263		metadataSpecs:         metadataSpecs,
264		exemplarSpecs:         exemplarSpecs,
265		dialOpts:              dialOpts,
266		storesMetric:          storesMetric,
267		gRPCInfoCallTimeout:   5 * time.Second,
268		stores:                make(map[string]*storeRef),
269		storeStatuses:         make(map[string]*StoreStatus),
270		unhealthyStoreTimeout: unhealthyStoreTimeout,
271	}
272	return ss
273}
274
275// TODO(bwplotka): Consider moving storeRef out of this package and renaming it, as it also supports rules API.
276type storeRef struct {
277	storepb.StoreClient
278
279	mtx  sync.RWMutex
280	cc   *grpc.ClientConn
281	addr string
282	// If rule is not nil, then this store also supports rules API.
283	rule     rulespb.RulesClient
284	metadata metadatapb.MetadataClient
285
286	// If exemplar is not nil, then this store also support exemplars API.
287	exemplar exemplarspb.ExemplarsClient
288
289	// If target is not nil, then this store also supports targets API.
290	target targetspb.TargetsClient
291
292	// Meta (can change during runtime).
293	labelSets []labels.Labels
294	storeType component.StoreAPI
295	minTime   int64
296	maxTime   int64
297
298	logger log.Logger
299}
300
301func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int64, storeType component.StoreAPI, rule rulespb.RulesClient, target targetspb.TargetsClient, metadata metadatapb.MetadataClient, exemplar exemplarspb.ExemplarsClient) {
302	s.mtx.Lock()
303	defer s.mtx.Unlock()
304
305	s.storeType = storeType
306	s.labelSets = labelSets
307	s.minTime = minTime
308	s.maxTime = maxTime
309	s.rule = rule
310	s.target = target
311	s.metadata = metadata
312	s.exemplar = exemplar
313}
314
315func (s *storeRef) StoreType() component.StoreAPI {
316	s.mtx.RLock()
317	defer s.mtx.RUnlock()
318
319	return s.storeType
320}
321
322func (s *storeRef) HasRulesAPI() bool {
323	s.mtx.RLock()
324	defer s.mtx.RUnlock()
325
326	return s.rule != nil
327}
328
329func (s *storeRef) HasTargetsAPI() bool {
330	s.mtx.RLock()
331	defer s.mtx.RUnlock()
332
333	return s.target != nil
334}
335
336func (s *storeRef) HasMetadataAPI() bool {
337	s.mtx.RLock()
338	defer s.mtx.RUnlock()
339
340	return s.metadata != nil
341}
342
343func (s *storeRef) HasExemplarsAPI() bool {
344	s.mtx.RLock()
345	defer s.mtx.RUnlock()
346
347	return s.exemplar != nil
348}
349
350func (s *storeRef) LabelSets() []labels.Labels {
351	s.mtx.RLock()
352	defer s.mtx.RUnlock()
353
354	labelSet := make([]labels.Labels, 0, len(s.labelSets))
355	for _, ls := range s.labelSets {
356		if len(ls) == 0 {
357			continue
358		}
359		// Compatibility label for Queriers pre 0.8.1. Filter it out now.
360		if ls[0].Name == store.CompatibilityTypeLabelName {
361			continue
362		}
363		labelSet = append(labelSet, ls.Copy())
364	}
365	return labelSet
366}
367
368func (s *storeRef) TimeRange() (mint int64, maxt int64) {
369	s.mtx.RLock()
370	defer s.mtx.RUnlock()
371
372	return s.minTime, s.maxTime
373}
374
375func (s *storeRef) String() string {
376	mint, maxt := s.TimeRange()
377	return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", s.addr, labelpb.PromLabelSetsToString(s.LabelSets()), mint, maxt)
378}
379
380func (s *storeRef) Addr() string {
381	return s.addr
382}
383
384func (s *storeRef) Close() {
385	runutil.CloseWithLogOnErr(s.logger, s.cc, fmt.Sprintf("store %v connection close", s.addr))
386}
387
388func newStoreAPIStats() map[component.StoreAPI]map[string]int {
389	nodes := make(map[component.StoreAPI]map[string]int, len(storepb.StoreType_name))
390	for i := range storepb.StoreType_name {
391		nodes[component.FromProto(storepb.StoreType(i))] = map[string]int{}
392	}
393	return nodes
394}
395
396// Update updates the store set. It fetches current list of store specs from function and updates the fresh metadata
397// from all stores. Keeps around statically defined nodes that were defined with the strict mode.
398func (s *StoreSet) Update(ctx context.Context) {
399	s.updateMtx.Lock()
400	defer s.updateMtx.Unlock()
401
402	s.storesMtx.RLock()
403	stores := make(map[string]*storeRef, len(s.stores))
404	for addr, st := range s.stores {
405		stores[addr] = st
406	}
407	s.storesMtx.RUnlock()
408
409	level.Debug(s.logger).Log("msg", "starting updating storeAPIs", "cachedStores", len(stores))
410
411	activeStores := s.getActiveStores(ctx, stores)
412	level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "activeStores", len(activeStores), "cachedStores", len(stores))
413
414	stats := newStoreAPIStats()
415
416	// Close stores that where not active this time (are not in active stores map).
417	for addr, st := range stores {
418		if _, ok := activeStores[addr]; ok {
419			stats[st.StoreType()][labelpb.PromLabelSetsToString(st.LabelSets())]++
420			continue
421		}
422
423		st.Close()
424		delete(stores, addr)
425		s.updateStoreStatus(st, errors.New(unhealthyStoreMessage))
426		level.Info(s.logger).Log("msg", unhealthyStoreMessage, "address", addr, "extLset", labelpb.PromLabelSetsToString(st.LabelSets()))
427	}
428
429	// Add stores that are not yet in stores.
430	for addr, st := range activeStores {
431		if _, ok := stores[addr]; ok {
432			continue
433		}
434
435		extLset := labelpb.PromLabelSetsToString(st.LabelSets())
436
437		// All producers should have unique external labels. While this does not check only StoreAPIs connected to
438		// this querier this allows to notify early user about misconfiguration. Warn only. This is also detectable from metric.
439		if st.StoreType() != nil &&
440			(st.StoreType() == component.Sidecar || st.StoreType() == component.Rule) &&
441			stats[component.Sidecar][extLset]+stats[component.Rule][extLset] > 0 {
442
443			level.Warn(s.logger).Log("msg", "found duplicate storeAPI producer (sidecar or ruler). This is not advices as it will malform data in in the same bucket",
444				"address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar][extLset]+stats[component.Rule][extLset]+1))
445		}
446		stats[st.StoreType()][extLset]++
447
448		stores[addr] = st
449		s.updateStoreStatus(st, nil)
450
451		if st.HasRulesAPI() {
452			level.Info(s.logger).Log("msg", "adding new rulesAPI to query storeset", "address", addr)
453		}
454
455		if st.HasExemplarsAPI() {
456			level.Info(s.logger).Log("msg", "adding new exemplarsAPI to query storeset", "address", addr)
457		}
458
459		if st.HasTargetsAPI() {
460			level.Info(s.logger).Log("msg", "adding new targetsAPI to query storeset", "address", addr)
461		}
462
463		level.Info(s.logger).Log("msg", "adding new storeAPI to query storeset", "address", addr, "extLset", extLset)
464	}
465
466	s.storesMetric.Update(stats)
467	s.storesMtx.Lock()
468	s.stores = stores
469	s.storesMtx.Unlock()
470
471	s.cleanUpStoreStatuses(stores)
472}
473
474func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*storeRef) map[string]*storeRef {
475	var (
476		// UNIQUE?
477		activeStores = make(map[string]*storeRef, len(stores))
478		mtx          sync.Mutex
479		wg           sync.WaitGroup
480
481		storeAddrSet    = make(map[string]struct{})
482		ruleAddrSet     = make(map[string]struct{})
483		targetAddrSet   = make(map[string]struct{})
484		metadataAddrSet = make(map[string]struct{})
485		exemplarAddrSet = make(map[string]struct{})
486	)
487
488	// Gather active stores map concurrently. Build new store if does not exist already.
489	for _, ruleSpec := range s.ruleSpecs() {
490		ruleAddrSet[ruleSpec.Addr()] = struct{}{}
491	}
492
493	// Gather active targets map concurrently. Add a new target if it does not exist already.
494	for _, targetSpec := range s.targetSpecs() {
495		targetAddrSet[targetSpec.Addr()] = struct{}{}
496	}
497
498	// Gather active stores map concurrently. Build new store if does not exist already.
499	for _, metadataSpec := range s.metadataSpecs() {
500		metadataAddrSet[metadataSpec.Addr()] = struct{}{}
501	}
502
503	// Gather active stores map concurrently. Build new store if does not exist already.
504	for _, exemplarSpec := range s.exemplarSpecs() {
505		exemplarAddrSet[exemplarSpec.Addr()] = struct{}{}
506	}
507
508	// Gather healthy stores map concurrently. Build new store if does not exist already.
509	for _, storeSpec := range s.storeSpecs() {
510		if _, ok := storeAddrSet[storeSpec.Addr()]; ok {
511			level.Warn(s.logger).Log("msg", "duplicated address in store nodes", "address", storeSpec.Addr())
512			continue
513		}
514		storeAddrSet[storeSpec.Addr()] = struct{}{}
515
516		wg.Add(1)
517		go func(spec StoreSpec) {
518			defer wg.Done()
519
520			addr := spec.Addr()
521
522			ctx, cancel := context.WithTimeout(ctx, s.gRPCInfoCallTimeout)
523			defer cancel()
524
525			st, seenAlready := stores[addr]
526			if !seenAlready {
527				// New store or was unactive and was removed in the past - create new one.
528				conn, err := grpc.DialContext(ctx, addr, s.dialOpts...)
529				if err != nil {
530					s.updateStoreStatus(&storeRef{addr: addr}, err)
531					level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "dialing connection"), "address", addr)
532					return
533				}
534
535				st = &storeRef{StoreClient: storepb.NewStoreClient(conn), storeType: component.UnknownStoreAPI, cc: conn, addr: addr, logger: s.logger}
536				if spec.StrictStatic() {
537					st.maxTime = math.MaxInt64
538				}
539			}
540
541			var rule rulespb.RulesClient
542			if _, ok := ruleAddrSet[addr]; ok {
543				rule = rulespb.NewRulesClient(st.cc)
544			}
545
546			var target targetspb.TargetsClient
547			if _, ok := targetAddrSet[addr]; ok {
548				target = targetspb.NewTargetsClient(st.cc)
549			}
550
551			var metadata metadatapb.MetadataClient
552			if _, ok := metadataAddrSet[addr]; ok {
553				metadata = metadatapb.NewMetadataClient(st.cc)
554			}
555
556			var exemplar exemplarspb.ExemplarsClient
557			if _, ok := exemplarAddrSet[addr]; ok {
558				exemplar = exemplarspb.NewExemplarsClient(st.cc)
559			}
560
561			// Check existing or new store. Is it healthy? What are current metadata?
562			labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient)
563			if err != nil {
564				if !seenAlready && !spec.StrictStatic() {
565					// Close only if new and not a strict static node.
566					// Unactive `s.stores` will be closed later on.
567					st.Close()
568				}
569				s.updateStoreStatus(st, err)
570				level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "getting metadata"), "address", addr)
571
572				if !spec.StrictStatic() {
573					return
574				}
575
576				// Still keep it around if static & strict mode enabled.
577				mtx.Lock()
578				defer mtx.Unlock()
579
580				activeStores[addr] = st
581				return
582			}
583
584			s.updateStoreStatus(st, nil)
585			st.Update(labelSets, minTime, maxTime, storeType, rule, target, metadata, exemplar)
586
587			mtx.Lock()
588			defer mtx.Unlock()
589
590			activeStores[addr] = st
591		}(storeSpec)
592	}
593	wg.Wait()
594
595	for ruleAddr := range ruleAddrSet {
596		if _, ok := storeAddrSet[ruleAddr]; !ok {
597			level.Warn(s.logger).Log("msg", "ignored rule store", "address", ruleAddr)
598		}
599	}
600	return activeStores
601}
602
603func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
604	s.storesStatusesMtx.Lock()
605	defer s.storesStatusesMtx.Unlock()
606
607	status := StoreStatus{Name: store.addr}
608	prev, ok := s.storeStatuses[store.addr]
609	if ok {
610		status = *prev
611	} else {
612		mint, maxt := store.TimeRange()
613		status.MinTime = mint
614		status.MaxTime = maxt
615	}
616
617	if err == nil {
618		status.LastCheck = time.Now()
619		mint, maxt := store.TimeRange()
620		status.LabelSets = store.LabelSets()
621		status.StoreType = store.StoreType()
622		status.MinTime = mint
623		status.MaxTime = maxt
624		status.LastError = nil
625	} else {
626		status.LastError = &stringError{originalErr: err}
627	}
628
629	s.storeStatuses[store.addr] = &status
630}
631
632func (s *StoreSet) GetStoreStatus() []StoreStatus {
633	s.storesStatusesMtx.RLock()
634	defer s.storesStatusesMtx.RUnlock()
635
636	statuses := make([]StoreStatus, 0, len(s.storeStatuses))
637	for _, v := range s.storeStatuses {
638		statuses = append(statuses, *v)
639	}
640
641	sort.Slice(statuses, func(i, j int) bool {
642		return statuses[i].Name < statuses[j].Name
643	})
644	return statuses
645}
646
647// Get returns a list of all active stores.
648func (s *StoreSet) Get() []store.Client {
649	s.storesMtx.RLock()
650	defer s.storesMtx.RUnlock()
651
652	stores := make([]store.Client, 0, len(s.stores))
653	for _, st := range s.stores {
654		stores = append(stores, st)
655	}
656	return stores
657}
658
659// GetRulesClients returns a list of all active rules clients.
660func (s *StoreSet) GetRulesClients() []rulespb.RulesClient {
661	s.storesMtx.RLock()
662	defer s.storesMtx.RUnlock()
663
664	rules := make([]rulespb.RulesClient, 0, len(s.stores))
665	for _, st := range s.stores {
666		if st.HasRulesAPI() {
667			rules = append(rules, st.rule)
668		}
669	}
670	return rules
671}
672
673// GetTargetsClients returns a list of all active targets clients.
674func (s *StoreSet) GetTargetsClients() []targetspb.TargetsClient {
675	s.storesMtx.RLock()
676	defer s.storesMtx.RUnlock()
677
678	targets := make([]targetspb.TargetsClient, 0, len(s.stores))
679	for _, st := range s.stores {
680		if st.HasTargetsAPI() {
681			targets = append(targets, st.target)
682		}
683	}
684	return targets
685}
686
687// GetMetadataClients returns a list of all active metadata clients.
688func (s *StoreSet) GetMetadataClients() []metadatapb.MetadataClient {
689	s.storesMtx.RLock()
690	defer s.storesMtx.RUnlock()
691
692	metadataClients := make([]metadatapb.MetadataClient, 0, len(s.stores))
693	for _, st := range s.stores {
694		if st.HasMetadataAPI() {
695			metadataClients = append(metadataClients, st.metadata)
696		}
697	}
698	return metadataClients
699}
700
701// GetExemplarsStores returns a list of all active exemplars stores.
702func (s *StoreSet) GetExemplarsStores() []*exemplarspb.ExemplarStore {
703	s.storesMtx.RLock()
704	defer s.storesMtx.RUnlock()
705
706	exemplarStores := make([]*exemplarspb.ExemplarStore, 0, len(s.stores))
707	for _, st := range s.stores {
708		if st.HasExemplarsAPI() {
709			exemplarStores = append(exemplarStores, &exemplarspb.ExemplarStore{
710				ExemplarsClient: st.exemplar,
711				LabelSets:       st.labelSets,
712			})
713		}
714	}
715	return exemplarStores
716}
717
718func (s *StoreSet) Close() {
719	s.storesMtx.Lock()
720	defer s.storesMtx.Unlock()
721
722	for _, st := range s.stores {
723		st.Close()
724	}
725	s.stores = map[string]*storeRef{}
726}
727
728func (s *StoreSet) cleanUpStoreStatuses(stores map[string]*storeRef) {
729	s.storesStatusesMtx.Lock()
730	defer s.storesStatusesMtx.Unlock()
731
732	now := time.Now()
733	for addr, status := range s.storeStatuses {
734		if _, ok := stores[addr]; ok {
735			continue
736		}
737
738		if now.Sub(status.LastCheck) >= s.unhealthyStoreTimeout {
739			delete(s.storeStatuses, addr)
740		}
741	}
742}
743