1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package query 5 6import ( 7 "context" 8 "encoding/json" 9 "fmt" 10 "math" 11 "sort" 12 "sync" 13 "time" 14 15 "github.com/go-kit/kit/log" 16 "github.com/go-kit/kit/log/level" 17 "github.com/pkg/errors" 18 "github.com/prometheus/client_golang/prometheus" 19 "github.com/prometheus/prometheus/pkg/labels" 20 "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" 21 "google.golang.org/grpc" 22 23 "github.com/thanos-io/thanos/pkg/component" 24 "github.com/thanos-io/thanos/pkg/metadata/metadatapb" 25 "github.com/thanos-io/thanos/pkg/rules/rulespb" 26 "github.com/thanos-io/thanos/pkg/runutil" 27 "github.com/thanos-io/thanos/pkg/store" 28 "github.com/thanos-io/thanos/pkg/store/labelpb" 29 "github.com/thanos-io/thanos/pkg/store/storepb" 30 "github.com/thanos-io/thanos/pkg/targets/targetspb" 31) 32 33const ( 34 unhealthyStoreMessage = "removing store because it's unhealthy or does not exist" 35) 36 37type StoreSpec interface { 38 // Addr returns StoreAPI Address for the store spec. It is used as ID for store. 39 Addr() string 40 // Metadata returns current labels, store type and min, max ranges for store. 41 // It can change for every call for this method. 42 // If metadata call fails we assume that store is no longer accessible and we should not use it. 43 // NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage 44 // given store connection. 45 Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []labels.Labels, mint int64, maxt int64, storeType component.StoreAPI, err error) 46 47 // StrictStatic returns true if the StoreAPI has been statically defined and it is under a strict mode. 48 StrictStatic() bool 49} 50 51type RuleSpec interface { 52 // Addr returns RulesAPI Address for the rules spec. It is used as its ID. 53 Addr() string 54} 55 56type TargetSpec interface { 57 // Addr returns TargetsAPI Address for the targets spec. It is used as its ID. 58 Addr() string 59} 60 61type MetadataSpec interface { 62 // Addr returns MetadataAPI Address for the metadata spec. It is used as its ID. 63 Addr() string 64} 65 66type ExemplarSpec interface { 67 // Addr returns ExemplarsAPI Address for the exemplars spec. It is used as its ID. 68 Addr() string 69} 70 71// stringError forces the error to be a string 72// when marshaled into a JSON. 73type stringError struct { 74 originalErr error 75} 76 77// MarshalJSON marshals the error into a string form. 78func (e *stringError) MarshalJSON() ([]byte, error) { 79 return json.Marshal(e.originalErr.Error()) 80} 81 82// Error returns the original underlying error. 83func (e *stringError) Error() string { 84 return e.originalErr.Error() 85} 86 87type StoreStatus struct { 88 Name string `json:"name"` 89 LastCheck time.Time `json:"lastCheck"` 90 LastError *stringError `json:"lastError"` 91 LabelSets []labels.Labels `json:"labelSets"` 92 StoreType component.StoreAPI `json:"-"` 93 MinTime int64 `json:"minTime"` 94 MaxTime int64 `json:"maxTime"` 95} 96 97type grpcStoreSpec struct { 98 addr string 99 strictstatic bool 100} 101 102// NewGRPCStoreSpec creates store pure gRPC spec. 103// It uses Info gRPC call to get Metadata. 104func NewGRPCStoreSpec(addr string, strictstatic bool) StoreSpec { 105 return &grpcStoreSpec{addr: addr, strictstatic: strictstatic} 106} 107 108// StrictStatic returns true if the StoreAPI has been statically defined and it is under a strict mode. 109func (s *grpcStoreSpec) StrictStatic() bool { 110 return s.strictstatic 111} 112 113func (s *grpcStoreSpec) Addr() string { 114 // API addr should not change between state changes. 115 return s.addr 116} 117 118// Metadata method for gRPC store API tries to reach host Info method until context timeout. If we are unable to get metadata after 119// that time, we assume that the host is unhealthy and return error. 120func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []labels.Labels, mint int64, maxt int64, Type component.StoreAPI, err error) { 121 resp, err := client.Info(ctx, &storepb.InfoRequest{}, grpc.WaitForReady(true)) 122 if err != nil { 123 return nil, 0, 0, nil, errors.Wrapf(err, "fetching store info from %s", s.addr) 124 } 125 if len(resp.LabelSets) == 0 && len(resp.Labels) > 0 { 126 resp.LabelSets = []labelpb.ZLabelSet{{Labels: resp.Labels}} 127 } 128 129 labelSets = make([]labels.Labels, 0, len(resp.LabelSets)) 130 for _, ls := range resp.LabelSets { 131 labelSets = append(labelSets, ls.PromLabels()) 132 } 133 return labelSets, resp.MinTime, resp.MaxTime, component.FromProto(resp.StoreType), nil 134} 135 136// storeSetNodeCollector is a metric collector reporting the number of available storeAPIs for Querier. 137// A Collector is required as we want atomic updates for all 'thanos_store_nodes_grpc_connections' series. 138type storeSetNodeCollector struct { 139 mtx sync.Mutex 140 storeNodes map[component.StoreAPI]map[string]int 141 storePerExtLset map[string]int 142 143 connectionsDesc *prometheus.Desc 144} 145 146func newStoreSetNodeCollector() *storeSetNodeCollector { 147 return &storeSetNodeCollector{ 148 storeNodes: map[component.StoreAPI]map[string]int{}, 149 connectionsDesc: prometheus.NewDesc( 150 "thanos_store_nodes_grpc_connections", 151 "Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.", 152 []string{"external_labels", "store_type"}, nil, 153 ), 154 } 155} 156 157func (c *storeSetNodeCollector) Update(nodes map[component.StoreAPI]map[string]int) { 158 storeNodes := make(map[component.StoreAPI]map[string]int, len(nodes)) 159 storePerExtLset := map[string]int{} 160 161 for k, v := range nodes { 162 storeNodes[k] = make(map[string]int, len(v)) 163 for kk, vv := range v { 164 storePerExtLset[kk] += vv 165 storeNodes[k][kk] = vv 166 } 167 } 168 169 c.mtx.Lock() 170 defer c.mtx.Unlock() 171 c.storeNodes = storeNodes 172 c.storePerExtLset = storePerExtLset 173} 174 175func (c *storeSetNodeCollector) Describe(ch chan<- *prometheus.Desc) { 176 ch <- c.connectionsDesc 177} 178 179func (c *storeSetNodeCollector) Collect(ch chan<- prometheus.Metric) { 180 c.mtx.Lock() 181 defer c.mtx.Unlock() 182 183 for storeType, occurrencesPerExtLset := range c.storeNodes { 184 for externalLabels, occurrences := range occurrencesPerExtLset { 185 var storeTypeStr string 186 if storeType != nil { 187 storeTypeStr = storeType.String() 188 } 189 ch <- prometheus.MustNewConstMetric(c.connectionsDesc, prometheus.GaugeValue, float64(occurrences), externalLabels, storeTypeStr) 190 } 191 } 192} 193 194// StoreSet maintains a set of active stores. It is backed up by Store Specifications that are dynamically fetched on 195// every Update() call. 196type StoreSet struct { 197 logger log.Logger 198 199 // Store specifications can change dynamically. If some store is missing from the list, we assuming it is no longer 200 // accessible and we close gRPC client for it. 201 storeSpecs func() []StoreSpec 202 ruleSpecs func() []RuleSpec 203 targetSpecs func() []TargetSpec 204 metadataSpecs func() []MetadataSpec 205 exemplarSpecs func() []ExemplarSpec 206 dialOpts []grpc.DialOption 207 gRPCInfoCallTimeout time.Duration 208 209 updateMtx sync.Mutex 210 storesMtx sync.RWMutex 211 storesStatusesMtx sync.RWMutex 212 213 // Main map of stores currently used for fanout. 214 stores map[string]*storeRef 215 storesMetric *storeSetNodeCollector 216 217 // Map of statuses used only by UI. 218 storeStatuses map[string]*StoreStatus 219 unhealthyStoreTimeout time.Duration 220} 221 222// NewStoreSet returns a new set of store APIs and potentially Rules APIs from given specs. 223func NewStoreSet( 224 logger log.Logger, 225 reg *prometheus.Registry, 226 storeSpecs func() []StoreSpec, 227 ruleSpecs func() []RuleSpec, 228 targetSpecs func() []TargetSpec, 229 metadataSpecs func() []MetadataSpec, 230 exemplarSpecs func() []ExemplarSpec, 231 dialOpts []grpc.DialOption, 232 unhealthyStoreTimeout time.Duration, 233) *StoreSet { 234 storesMetric := newStoreSetNodeCollector() 235 if reg != nil { 236 reg.MustRegister(storesMetric) 237 } 238 239 if logger == nil { 240 logger = log.NewNopLogger() 241 } 242 if storeSpecs == nil { 243 storeSpecs = func() []StoreSpec { return nil } 244 } 245 if ruleSpecs == nil { 246 ruleSpecs = func() []RuleSpec { return nil } 247 } 248 if targetSpecs == nil { 249 targetSpecs = func() []TargetSpec { return nil } 250 } 251 if metadataSpecs == nil { 252 metadataSpecs = func() []MetadataSpec { return nil } 253 } 254 if exemplarSpecs == nil { 255 exemplarSpecs = func() []ExemplarSpec { return nil } 256 } 257 258 ss := &StoreSet{ 259 logger: log.With(logger, "component", "storeset"), 260 storeSpecs: storeSpecs, 261 ruleSpecs: ruleSpecs, 262 targetSpecs: targetSpecs, 263 metadataSpecs: metadataSpecs, 264 exemplarSpecs: exemplarSpecs, 265 dialOpts: dialOpts, 266 storesMetric: storesMetric, 267 gRPCInfoCallTimeout: 5 * time.Second, 268 stores: make(map[string]*storeRef), 269 storeStatuses: make(map[string]*StoreStatus), 270 unhealthyStoreTimeout: unhealthyStoreTimeout, 271 } 272 return ss 273} 274 275// TODO(bwplotka): Consider moving storeRef out of this package and renaming it, as it also supports rules API. 276type storeRef struct { 277 storepb.StoreClient 278 279 mtx sync.RWMutex 280 cc *grpc.ClientConn 281 addr string 282 // If rule is not nil, then this store also supports rules API. 283 rule rulespb.RulesClient 284 metadata metadatapb.MetadataClient 285 286 // If exemplar is not nil, then this store also support exemplars API. 287 exemplar exemplarspb.ExemplarsClient 288 289 // If target is not nil, then this store also supports targets API. 290 target targetspb.TargetsClient 291 292 // Meta (can change during runtime). 293 labelSets []labels.Labels 294 storeType component.StoreAPI 295 minTime int64 296 maxTime int64 297 298 logger log.Logger 299} 300 301func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int64, storeType component.StoreAPI, rule rulespb.RulesClient, target targetspb.TargetsClient, metadata metadatapb.MetadataClient, exemplar exemplarspb.ExemplarsClient) { 302 s.mtx.Lock() 303 defer s.mtx.Unlock() 304 305 s.storeType = storeType 306 s.labelSets = labelSets 307 s.minTime = minTime 308 s.maxTime = maxTime 309 s.rule = rule 310 s.target = target 311 s.metadata = metadata 312 s.exemplar = exemplar 313} 314 315func (s *storeRef) StoreType() component.StoreAPI { 316 s.mtx.RLock() 317 defer s.mtx.RUnlock() 318 319 return s.storeType 320} 321 322func (s *storeRef) HasRulesAPI() bool { 323 s.mtx.RLock() 324 defer s.mtx.RUnlock() 325 326 return s.rule != nil 327} 328 329func (s *storeRef) HasTargetsAPI() bool { 330 s.mtx.RLock() 331 defer s.mtx.RUnlock() 332 333 return s.target != nil 334} 335 336func (s *storeRef) HasMetadataAPI() bool { 337 s.mtx.RLock() 338 defer s.mtx.RUnlock() 339 340 return s.metadata != nil 341} 342 343func (s *storeRef) HasExemplarsAPI() bool { 344 s.mtx.RLock() 345 defer s.mtx.RUnlock() 346 347 return s.exemplar != nil 348} 349 350func (s *storeRef) LabelSets() []labels.Labels { 351 s.mtx.RLock() 352 defer s.mtx.RUnlock() 353 354 labelSet := make([]labels.Labels, 0, len(s.labelSets)) 355 for _, ls := range s.labelSets { 356 if len(ls) == 0 { 357 continue 358 } 359 // Compatibility label for Queriers pre 0.8.1. Filter it out now. 360 if ls[0].Name == store.CompatibilityTypeLabelName { 361 continue 362 } 363 labelSet = append(labelSet, ls.Copy()) 364 } 365 return labelSet 366} 367 368func (s *storeRef) TimeRange() (mint int64, maxt int64) { 369 s.mtx.RLock() 370 defer s.mtx.RUnlock() 371 372 return s.minTime, s.maxTime 373} 374 375func (s *storeRef) String() string { 376 mint, maxt := s.TimeRange() 377 return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", s.addr, labelpb.PromLabelSetsToString(s.LabelSets()), mint, maxt) 378} 379 380func (s *storeRef) Addr() string { 381 return s.addr 382} 383 384func (s *storeRef) Close() { 385 runutil.CloseWithLogOnErr(s.logger, s.cc, fmt.Sprintf("store %v connection close", s.addr)) 386} 387 388func newStoreAPIStats() map[component.StoreAPI]map[string]int { 389 nodes := make(map[component.StoreAPI]map[string]int, len(storepb.StoreType_name)) 390 for i := range storepb.StoreType_name { 391 nodes[component.FromProto(storepb.StoreType(i))] = map[string]int{} 392 } 393 return nodes 394} 395 396// Update updates the store set. It fetches current list of store specs from function and updates the fresh metadata 397// from all stores. Keeps around statically defined nodes that were defined with the strict mode. 398func (s *StoreSet) Update(ctx context.Context) { 399 s.updateMtx.Lock() 400 defer s.updateMtx.Unlock() 401 402 s.storesMtx.RLock() 403 stores := make(map[string]*storeRef, len(s.stores)) 404 for addr, st := range s.stores { 405 stores[addr] = st 406 } 407 s.storesMtx.RUnlock() 408 409 level.Debug(s.logger).Log("msg", "starting updating storeAPIs", "cachedStores", len(stores)) 410 411 activeStores := s.getActiveStores(ctx, stores) 412 level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "activeStores", len(activeStores), "cachedStores", len(stores)) 413 414 stats := newStoreAPIStats() 415 416 // Close stores that where not active this time (are not in active stores map). 417 for addr, st := range stores { 418 if _, ok := activeStores[addr]; ok { 419 stats[st.StoreType()][labelpb.PromLabelSetsToString(st.LabelSets())]++ 420 continue 421 } 422 423 st.Close() 424 delete(stores, addr) 425 s.updateStoreStatus(st, errors.New(unhealthyStoreMessage)) 426 level.Info(s.logger).Log("msg", unhealthyStoreMessage, "address", addr, "extLset", labelpb.PromLabelSetsToString(st.LabelSets())) 427 } 428 429 // Add stores that are not yet in stores. 430 for addr, st := range activeStores { 431 if _, ok := stores[addr]; ok { 432 continue 433 } 434 435 extLset := labelpb.PromLabelSetsToString(st.LabelSets()) 436 437 // All producers should have unique external labels. While this does not check only StoreAPIs connected to 438 // this querier this allows to notify early user about misconfiguration. Warn only. This is also detectable from metric. 439 if st.StoreType() != nil && 440 (st.StoreType() == component.Sidecar || st.StoreType() == component.Rule) && 441 stats[component.Sidecar][extLset]+stats[component.Rule][extLset] > 0 { 442 443 level.Warn(s.logger).Log("msg", "found duplicate storeAPI producer (sidecar or ruler). This is not advices as it will malform data in in the same bucket", 444 "address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar][extLset]+stats[component.Rule][extLset]+1)) 445 } 446 stats[st.StoreType()][extLset]++ 447 448 stores[addr] = st 449 s.updateStoreStatus(st, nil) 450 451 if st.HasRulesAPI() { 452 level.Info(s.logger).Log("msg", "adding new rulesAPI to query storeset", "address", addr) 453 } 454 455 if st.HasExemplarsAPI() { 456 level.Info(s.logger).Log("msg", "adding new exemplarsAPI to query storeset", "address", addr) 457 } 458 459 if st.HasTargetsAPI() { 460 level.Info(s.logger).Log("msg", "adding new targetsAPI to query storeset", "address", addr) 461 } 462 463 level.Info(s.logger).Log("msg", "adding new storeAPI to query storeset", "address", addr, "extLset", extLset) 464 } 465 466 s.storesMetric.Update(stats) 467 s.storesMtx.Lock() 468 s.stores = stores 469 s.storesMtx.Unlock() 470 471 s.cleanUpStoreStatuses(stores) 472} 473 474func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*storeRef) map[string]*storeRef { 475 var ( 476 // UNIQUE? 477 activeStores = make(map[string]*storeRef, len(stores)) 478 mtx sync.Mutex 479 wg sync.WaitGroup 480 481 storeAddrSet = make(map[string]struct{}) 482 ruleAddrSet = make(map[string]struct{}) 483 targetAddrSet = make(map[string]struct{}) 484 metadataAddrSet = make(map[string]struct{}) 485 exemplarAddrSet = make(map[string]struct{}) 486 ) 487 488 // Gather active stores map concurrently. Build new store if does not exist already. 489 for _, ruleSpec := range s.ruleSpecs() { 490 ruleAddrSet[ruleSpec.Addr()] = struct{}{} 491 } 492 493 // Gather active targets map concurrently. Add a new target if it does not exist already. 494 for _, targetSpec := range s.targetSpecs() { 495 targetAddrSet[targetSpec.Addr()] = struct{}{} 496 } 497 498 // Gather active stores map concurrently. Build new store if does not exist already. 499 for _, metadataSpec := range s.metadataSpecs() { 500 metadataAddrSet[metadataSpec.Addr()] = struct{}{} 501 } 502 503 // Gather active stores map concurrently. Build new store if does not exist already. 504 for _, exemplarSpec := range s.exemplarSpecs() { 505 exemplarAddrSet[exemplarSpec.Addr()] = struct{}{} 506 } 507 508 // Gather healthy stores map concurrently. Build new store if does not exist already. 509 for _, storeSpec := range s.storeSpecs() { 510 if _, ok := storeAddrSet[storeSpec.Addr()]; ok { 511 level.Warn(s.logger).Log("msg", "duplicated address in store nodes", "address", storeSpec.Addr()) 512 continue 513 } 514 storeAddrSet[storeSpec.Addr()] = struct{}{} 515 516 wg.Add(1) 517 go func(spec StoreSpec) { 518 defer wg.Done() 519 520 addr := spec.Addr() 521 522 ctx, cancel := context.WithTimeout(ctx, s.gRPCInfoCallTimeout) 523 defer cancel() 524 525 st, seenAlready := stores[addr] 526 if !seenAlready { 527 // New store or was unactive and was removed in the past - create new one. 528 conn, err := grpc.DialContext(ctx, addr, s.dialOpts...) 529 if err != nil { 530 s.updateStoreStatus(&storeRef{addr: addr}, err) 531 level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "dialing connection"), "address", addr) 532 return 533 } 534 535 st = &storeRef{StoreClient: storepb.NewStoreClient(conn), storeType: component.UnknownStoreAPI, cc: conn, addr: addr, logger: s.logger} 536 if spec.StrictStatic() { 537 st.maxTime = math.MaxInt64 538 } 539 } 540 541 var rule rulespb.RulesClient 542 if _, ok := ruleAddrSet[addr]; ok { 543 rule = rulespb.NewRulesClient(st.cc) 544 } 545 546 var target targetspb.TargetsClient 547 if _, ok := targetAddrSet[addr]; ok { 548 target = targetspb.NewTargetsClient(st.cc) 549 } 550 551 var metadata metadatapb.MetadataClient 552 if _, ok := metadataAddrSet[addr]; ok { 553 metadata = metadatapb.NewMetadataClient(st.cc) 554 } 555 556 var exemplar exemplarspb.ExemplarsClient 557 if _, ok := exemplarAddrSet[addr]; ok { 558 exemplar = exemplarspb.NewExemplarsClient(st.cc) 559 } 560 561 // Check existing or new store. Is it healthy? What are current metadata? 562 labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient) 563 if err != nil { 564 if !seenAlready && !spec.StrictStatic() { 565 // Close only if new and not a strict static node. 566 // Unactive `s.stores` will be closed later on. 567 st.Close() 568 } 569 s.updateStoreStatus(st, err) 570 level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "getting metadata"), "address", addr) 571 572 if !spec.StrictStatic() { 573 return 574 } 575 576 // Still keep it around if static & strict mode enabled. 577 mtx.Lock() 578 defer mtx.Unlock() 579 580 activeStores[addr] = st 581 return 582 } 583 584 s.updateStoreStatus(st, nil) 585 st.Update(labelSets, minTime, maxTime, storeType, rule, target, metadata, exemplar) 586 587 mtx.Lock() 588 defer mtx.Unlock() 589 590 activeStores[addr] = st 591 }(storeSpec) 592 } 593 wg.Wait() 594 595 for ruleAddr := range ruleAddrSet { 596 if _, ok := storeAddrSet[ruleAddr]; !ok { 597 level.Warn(s.logger).Log("msg", "ignored rule store", "address", ruleAddr) 598 } 599 } 600 return activeStores 601} 602 603func (s *StoreSet) updateStoreStatus(store *storeRef, err error) { 604 s.storesStatusesMtx.Lock() 605 defer s.storesStatusesMtx.Unlock() 606 607 status := StoreStatus{Name: store.addr} 608 prev, ok := s.storeStatuses[store.addr] 609 if ok { 610 status = *prev 611 } else { 612 mint, maxt := store.TimeRange() 613 status.MinTime = mint 614 status.MaxTime = maxt 615 } 616 617 if err == nil { 618 status.LastCheck = time.Now() 619 mint, maxt := store.TimeRange() 620 status.LabelSets = store.LabelSets() 621 status.StoreType = store.StoreType() 622 status.MinTime = mint 623 status.MaxTime = maxt 624 status.LastError = nil 625 } else { 626 status.LastError = &stringError{originalErr: err} 627 } 628 629 s.storeStatuses[store.addr] = &status 630} 631 632func (s *StoreSet) GetStoreStatus() []StoreStatus { 633 s.storesStatusesMtx.RLock() 634 defer s.storesStatusesMtx.RUnlock() 635 636 statuses := make([]StoreStatus, 0, len(s.storeStatuses)) 637 for _, v := range s.storeStatuses { 638 statuses = append(statuses, *v) 639 } 640 641 sort.Slice(statuses, func(i, j int) bool { 642 return statuses[i].Name < statuses[j].Name 643 }) 644 return statuses 645} 646 647// Get returns a list of all active stores. 648func (s *StoreSet) Get() []store.Client { 649 s.storesMtx.RLock() 650 defer s.storesMtx.RUnlock() 651 652 stores := make([]store.Client, 0, len(s.stores)) 653 for _, st := range s.stores { 654 stores = append(stores, st) 655 } 656 return stores 657} 658 659// GetRulesClients returns a list of all active rules clients. 660func (s *StoreSet) GetRulesClients() []rulespb.RulesClient { 661 s.storesMtx.RLock() 662 defer s.storesMtx.RUnlock() 663 664 rules := make([]rulespb.RulesClient, 0, len(s.stores)) 665 for _, st := range s.stores { 666 if st.HasRulesAPI() { 667 rules = append(rules, st.rule) 668 } 669 } 670 return rules 671} 672 673// GetTargetsClients returns a list of all active targets clients. 674func (s *StoreSet) GetTargetsClients() []targetspb.TargetsClient { 675 s.storesMtx.RLock() 676 defer s.storesMtx.RUnlock() 677 678 targets := make([]targetspb.TargetsClient, 0, len(s.stores)) 679 for _, st := range s.stores { 680 if st.HasTargetsAPI() { 681 targets = append(targets, st.target) 682 } 683 } 684 return targets 685} 686 687// GetMetadataClients returns a list of all active metadata clients. 688func (s *StoreSet) GetMetadataClients() []metadatapb.MetadataClient { 689 s.storesMtx.RLock() 690 defer s.storesMtx.RUnlock() 691 692 metadataClients := make([]metadatapb.MetadataClient, 0, len(s.stores)) 693 for _, st := range s.stores { 694 if st.HasMetadataAPI() { 695 metadataClients = append(metadataClients, st.metadata) 696 } 697 } 698 return metadataClients 699} 700 701// GetExemplarsStores returns a list of all active exemplars stores. 702func (s *StoreSet) GetExemplarsStores() []*exemplarspb.ExemplarStore { 703 s.storesMtx.RLock() 704 defer s.storesMtx.RUnlock() 705 706 exemplarStores := make([]*exemplarspb.ExemplarStore, 0, len(s.stores)) 707 for _, st := range s.stores { 708 if st.HasExemplarsAPI() { 709 exemplarStores = append(exemplarStores, &exemplarspb.ExemplarStore{ 710 ExemplarsClient: st.exemplar, 711 LabelSets: st.labelSets, 712 }) 713 } 714 } 715 return exemplarStores 716} 717 718func (s *StoreSet) Close() { 719 s.storesMtx.Lock() 720 defer s.storesMtx.Unlock() 721 722 for _, st := range s.stores { 723 st.Close() 724 } 725 s.stores = map[string]*storeRef{} 726} 727 728func (s *StoreSet) cleanUpStoreStatuses(stores map[string]*storeRef) { 729 s.storesStatusesMtx.Lock() 730 defer s.storesStatusesMtx.Unlock() 731 732 now := time.Now() 733 for addr, status := range s.storeStatuses { 734 if _, ok := stores[addr]; ok { 735 continue 736 } 737 738 if now.Sub(status.LastCheck) >= s.unhealthyStoreTimeout { 739 delete(s.storeStatuses, addr) 740 } 741 } 742} 743