1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package storage 15 16import ( 17 "container/heap" 18 "context" 19 "sort" 20 "strings" 21 22 "github.com/go-kit/kit/log" 23 "github.com/go-kit/kit/log/level" 24 "github.com/pkg/errors" 25 "github.com/prometheus/common/model" 26 "github.com/prometheus/prometheus/pkg/labels" 27) 28 29type fanout struct { 30 logger log.Logger 31 32 primary Storage 33 secondaries []Storage 34} 35 36// NewFanout returns a new fan-out Storage, which proxies reads and writes 37// through to multiple underlying storages. 38func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Storage { 39 return &fanout{ 40 logger: logger, 41 primary: primary, 42 secondaries: secondaries, 43 } 44} 45 46// StartTime implements the Storage interface. 47func (f *fanout) StartTime() (int64, error) { 48 // StartTime of a fanout should be the earliest StartTime of all its storages, 49 // both primary and secondaries. 50 firstTime, err := f.primary.StartTime() 51 if err != nil { 52 return int64(model.Latest), err 53 } 54 55 for _, storage := range f.secondaries { 56 t, err := storage.StartTime() 57 if err != nil { 58 return int64(model.Latest), err 59 } 60 if t < firstTime { 61 firstTime = t 62 } 63 } 64 return firstTime, nil 65} 66 67func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) { 68 queriers := make([]Querier, 0, 1+len(f.secondaries)) 69 70 // Add primary querier 71 primaryQuerier, err := f.primary.Querier(ctx, mint, maxt) 72 if err != nil { 73 return nil, err 74 } 75 queriers = append(queriers, primaryQuerier) 76 77 // Add secondary queriers 78 for _, storage := range f.secondaries { 79 querier, err := storage.Querier(ctx, mint, maxt) 80 if err != nil { 81 NewMergeQuerier(primaryQuerier, queriers).Close() 82 return nil, err 83 } 84 queriers = append(queriers, querier) 85 } 86 87 return NewMergeQuerier(primaryQuerier, queriers), nil 88} 89 90func (f *fanout) Appender() (Appender, error) { 91 primary, err := f.primary.Appender() 92 if err != nil { 93 return nil, err 94 } 95 96 secondaries := make([]Appender, 0, len(f.secondaries)) 97 for _, storage := range f.secondaries { 98 appender, err := storage.Appender() 99 if err != nil { 100 return nil, err 101 } 102 secondaries = append(secondaries, appender) 103 } 104 return &fanoutAppender{ 105 logger: f.logger, 106 primary: primary, 107 secondaries: secondaries, 108 }, nil 109} 110 111// Close closes the storage and all its underlying resources. 112func (f *fanout) Close() error { 113 if err := f.primary.Close(); err != nil { 114 return err 115 } 116 117 // TODO return multiple errors? 118 var lastErr error 119 for _, storage := range f.secondaries { 120 if err := storage.Close(); err != nil { 121 lastErr = err 122 } 123 } 124 return lastErr 125} 126 127// fanoutAppender implements Appender. 128type fanoutAppender struct { 129 logger log.Logger 130 131 primary Appender 132 secondaries []Appender 133} 134 135func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { 136 ref, err := f.primary.Add(l, t, v) 137 if err != nil { 138 return ref, err 139 } 140 141 for _, appender := range f.secondaries { 142 if _, err := appender.Add(l, t, v); err != nil { 143 return 0, err 144 } 145 } 146 return ref, nil 147} 148 149func (f *fanoutAppender) AddFast(l labels.Labels, ref uint64, t int64, v float64) error { 150 if err := f.primary.AddFast(l, ref, t, v); err != nil { 151 return err 152 } 153 154 for _, appender := range f.secondaries { 155 if _, err := appender.Add(l, t, v); err != nil { 156 return err 157 } 158 } 159 return nil 160} 161 162func (f *fanoutAppender) Commit() (err error) { 163 err = f.primary.Commit() 164 165 for _, appender := range f.secondaries { 166 if err == nil { 167 err = appender.Commit() 168 } else { 169 if rollbackErr := appender.Rollback(); rollbackErr != nil { 170 level.Error(f.logger).Log("msg", "Squashed rollback error on commit", "err", rollbackErr) 171 } 172 } 173 } 174 return 175} 176 177func (f *fanoutAppender) Rollback() (err error) { 178 err = f.primary.Rollback() 179 180 for _, appender := range f.secondaries { 181 rollbackErr := appender.Rollback() 182 if err == nil { 183 err = rollbackErr 184 } else if rollbackErr != nil { 185 level.Error(f.logger).Log("msg", "Squashed rollback error on rollback", "err", rollbackErr) 186 } 187 } 188 return nil 189} 190 191// mergeQuerier implements Querier. 192type mergeQuerier struct { 193 primaryQuerier Querier 194 queriers []Querier 195 196 failedQueriers map[Querier]struct{} 197 setQuerierMap map[SeriesSet]Querier 198} 199 200// NewMergeQuerier returns a new Querier that merges results of input queriers. 201// NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it, 202// and will filter NoopQueriers from its arguments, in order to reduce overhead 203// when only one querier is passed. 204func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier { 205 filtered := make([]Querier, 0, len(queriers)) 206 for _, querier := range queriers { 207 if querier != NoopQuerier() { 208 filtered = append(filtered, querier) 209 } 210 } 211 212 setQuerierMap := make(map[SeriesSet]Querier) 213 failedQueriers := make(map[Querier]struct{}) 214 215 switch len(filtered) { 216 case 0: 217 return NoopQuerier() 218 case 1: 219 return filtered[0] 220 default: 221 return &mergeQuerier{ 222 primaryQuerier: primaryQuerier, 223 queriers: filtered, 224 failedQueriers: failedQueriers, 225 setQuerierMap: setQuerierMap, 226 } 227 } 228} 229 230// Select returns a set of series that matches the given label matchers. 231func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { 232 seriesSets := make([]SeriesSet, 0, len(q.queriers)) 233 var warnings Warnings 234 for _, querier := range q.queriers { 235 set, wrn, err := querier.Select(params, matchers...) 236 q.setQuerierMap[set] = querier 237 if wrn != nil { 238 warnings = append(warnings, wrn...) 239 } 240 if err != nil { 241 q.failedQueriers[querier] = struct{}{} 242 // If the error source isn't the primary querier, return the error as a warning and continue. 243 if querier != q.primaryQuerier { 244 warnings = append(warnings, err) 245 continue 246 } else { 247 return nil, nil, err 248 } 249 } 250 seriesSets = append(seriesSets, set) 251 } 252 return NewMergeSeriesSet(seriesSets, q), warnings, nil 253} 254 255// LabelValues returns all potential values for a label name. 256func (q *mergeQuerier) LabelValues(name string) ([]string, Warnings, error) { 257 var results [][]string 258 var warnings Warnings 259 for _, querier := range q.queriers { 260 values, wrn, err := querier.LabelValues(name) 261 262 if wrn != nil { 263 warnings = append(warnings, wrn...) 264 } 265 if err != nil { 266 q.failedQueriers[querier] = struct{}{} 267 // If the error source isn't the primary querier, return the error as a warning and continue. 268 if querier != q.primaryQuerier { 269 warnings = append(warnings, err) 270 continue 271 } else { 272 return nil, nil, err 273 } 274 } 275 results = append(results, values) 276 } 277 return mergeStringSlices(results), warnings, nil 278} 279 280func (q *mergeQuerier) IsFailedSet(set SeriesSet) bool { 281 _, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]] 282 return isFailedQuerier 283} 284 285func mergeStringSlices(ss [][]string) []string { 286 switch len(ss) { 287 case 0: 288 return nil 289 case 1: 290 return ss[0] 291 case 2: 292 return mergeTwoStringSlices(ss[0], ss[1]) 293 default: 294 halfway := len(ss) / 2 295 return mergeTwoStringSlices( 296 mergeStringSlices(ss[:halfway]), 297 mergeStringSlices(ss[halfway:]), 298 ) 299 } 300} 301 302func mergeTwoStringSlices(a, b []string) []string { 303 i, j := 0, 0 304 result := make([]string, 0, len(a)+len(b)) 305 for i < len(a) && j < len(b) { 306 switch strings.Compare(a[i], b[j]) { 307 case 0: 308 result = append(result, a[i]) 309 i++ 310 j++ 311 case -1: 312 result = append(result, a[i]) 313 i++ 314 case 1: 315 result = append(result, b[j]) 316 j++ 317 } 318 } 319 result = append(result, a[i:]...) 320 result = append(result, b[j:]...) 321 return result 322} 323 324// LabelNames returns all the unique label names present in the block in sorted order. 325func (q *mergeQuerier) LabelNames() ([]string, Warnings, error) { 326 labelNamesMap := make(map[string]struct{}) 327 var warnings Warnings 328 for _, b := range q.queriers { 329 names, wrn, err := b.LabelNames() 330 if wrn != nil { 331 warnings = append(warnings, wrn...) 332 } 333 334 if err != nil { 335 // If the error source isn't the primary querier, return the error as a warning and continue. 336 if b != q.primaryQuerier { 337 warnings = append(warnings, err) 338 continue 339 } else { 340 return nil, nil, errors.Wrap(err, "LabelNames() from Querier") 341 } 342 } 343 344 for _, name := range names { 345 labelNamesMap[name] = struct{}{} 346 } 347 } 348 349 labelNames := make([]string, 0, len(labelNamesMap)) 350 for name := range labelNamesMap { 351 labelNames = append(labelNames, name) 352 } 353 sort.Strings(labelNames) 354 355 return labelNames, warnings, nil 356} 357 358// Close releases the resources of the Querier. 359func (q *mergeQuerier) Close() error { 360 // TODO return multiple errors? 361 var lastErr error 362 for _, querier := range q.queriers { 363 if err := querier.Close(); err != nil { 364 lastErr = err 365 } 366 } 367 return lastErr 368} 369 370// mergeSeriesSet implements SeriesSet 371type mergeSeriesSet struct { 372 currentLabels labels.Labels 373 currentSets []SeriesSet 374 heap seriesSetHeap 375 sets []SeriesSet 376 377 querier *mergeQuerier 378} 379 380// NewMergeSeriesSet returns a new series set that merges (deduplicates) 381// series returned by the input series sets when iterating. 382func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet { 383 if len(sets) == 1 { 384 return sets[0] 385 } 386 387 // Sets need to be pre-advanced, so we can introspect the label of the 388 // series under the cursor. 389 var h seriesSetHeap 390 for _, set := range sets { 391 if set == nil { 392 continue 393 } 394 if set.Next() { 395 heap.Push(&h, set) 396 } 397 } 398 return &mergeSeriesSet{ 399 heap: h, 400 sets: sets, 401 querier: querier, 402 } 403} 404 405func (c *mergeSeriesSet) Next() bool { 406 // Run in a loop because the "next" series sets may not be valid anymore. 407 // If a remote querier fails, we discard all series sets from that querier. 408 // If, for the current label set, all the next series sets come from 409 // failed remote storage sources, we want to keep trying with the next label set. 410 for { 411 // Firstly advance all the current series sets. If any of them have run out 412 // we can drop them, otherwise they should be inserted back into the heap. 413 for _, set := range c.currentSets { 414 if set.Next() { 415 heap.Push(&c.heap, set) 416 } 417 } 418 if len(c.heap) == 0 { 419 return false 420 } 421 422 // Now, pop items of the heap that have equal label sets. 423 c.currentSets = nil 424 c.currentLabels = c.heap[0].At().Labels() 425 for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) { 426 set := heap.Pop(&c.heap).(SeriesSet) 427 if c.querier != nil && c.querier.IsFailedSet(set) { 428 continue 429 } 430 c.currentSets = append(c.currentSets, set) 431 } 432 433 // As long as the current set contains at least 1 set, 434 // then it should return true. 435 if len(c.currentSets) != 0 { 436 break 437 } 438 } 439 return true 440} 441 442func (c *mergeSeriesSet) At() Series { 443 if len(c.currentSets) == 1 { 444 return c.currentSets[0].At() 445 } 446 series := []Series{} 447 for _, seriesSet := range c.currentSets { 448 series = append(series, seriesSet.At()) 449 } 450 return &mergeSeries{ 451 labels: c.currentLabels, 452 series: series, 453 } 454} 455 456func (c *mergeSeriesSet) Err() error { 457 for _, set := range c.sets { 458 if err := set.Err(); err != nil { 459 return err 460 } 461 } 462 return nil 463} 464 465type seriesSetHeap []SeriesSet 466 467func (h seriesSetHeap) Len() int { return len(h) } 468func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } 469 470func (h seriesSetHeap) Less(i, j int) bool { 471 a, b := h[i].At().Labels(), h[j].At().Labels() 472 return labels.Compare(a, b) < 0 473} 474 475func (h *seriesSetHeap) Push(x interface{}) { 476 *h = append(*h, x.(SeriesSet)) 477} 478 479func (h *seriesSetHeap) Pop() interface{} { 480 old := *h 481 n := len(old) 482 x := old[n-1] 483 *h = old[0 : n-1] 484 return x 485} 486 487type mergeSeries struct { 488 labels labels.Labels 489 series []Series 490} 491 492func (m *mergeSeries) Labels() labels.Labels { 493 return m.labels 494} 495 496func (m *mergeSeries) Iterator() SeriesIterator { 497 iterators := make([]SeriesIterator, 0, len(m.series)) 498 for _, s := range m.series { 499 iterators = append(iterators, s.Iterator()) 500 } 501 return newMergeIterator(iterators) 502} 503 504type mergeIterator struct { 505 iterators []SeriesIterator 506 h seriesIteratorHeap 507} 508 509func newMergeIterator(iterators []SeriesIterator) SeriesIterator { 510 return &mergeIterator{ 511 iterators: iterators, 512 h: nil, 513 } 514} 515 516func (c *mergeIterator) Seek(t int64) bool { 517 c.h = seriesIteratorHeap{} 518 for _, iter := range c.iterators { 519 if iter.Seek(t) { 520 heap.Push(&c.h, iter) 521 } 522 } 523 return len(c.h) > 0 524} 525 526func (c *mergeIterator) At() (t int64, v float64) { 527 if len(c.h) == 0 { 528 panic("mergeIterator.At() called after .Next() returned false.") 529 } 530 531 return c.h[0].At() 532} 533 534func (c *mergeIterator) Next() bool { 535 if c.h == nil { 536 for _, iter := range c.iterators { 537 if iter.Next() { 538 heap.Push(&c.h, iter) 539 } 540 } 541 542 return len(c.h) > 0 543 } 544 545 if len(c.h) == 0 { 546 return false 547 } 548 549 currt, _ := c.At() 550 for len(c.h) > 0 { 551 nextt, _ := c.h[0].At() 552 if nextt != currt { 553 break 554 } 555 556 iter := heap.Pop(&c.h).(SeriesIterator) 557 if iter.Next() { 558 heap.Push(&c.h, iter) 559 } 560 } 561 562 return len(c.h) > 0 563} 564 565func (c *mergeIterator) Err() error { 566 for _, iter := range c.iterators { 567 if err := iter.Err(); err != nil { 568 return err 569 } 570 } 571 return nil 572} 573 574type seriesIteratorHeap []SeriesIterator 575 576func (h seriesIteratorHeap) Len() int { return len(h) } 577func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } 578 579func (h seriesIteratorHeap) Less(i, j int) bool { 580 at, _ := h[i].At() 581 bt, _ := h[j].At() 582 return at < bt 583} 584 585func (h *seriesIteratorHeap) Push(x interface{}) { 586 *h = append(*h, x.(SeriesIterator)) 587} 588 589func (h *seriesIteratorHeap) Pop() interface{} { 590 old := *h 591 n := len(old) 592 x := old[n-1] 593 *h = old[0 : n-1] 594 return x 595} 596