1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package storage
15
16import (
17	"container/heap"
18	"context"
19	"sort"
20	"strings"
21
22	"github.com/go-kit/kit/log"
23	"github.com/go-kit/kit/log/level"
24	"github.com/pkg/errors"
25	"github.com/prometheus/common/model"
26	"github.com/prometheus/prometheus/pkg/labels"
27)
28
29type fanout struct {
30	logger log.Logger
31
32	primary     Storage
33	secondaries []Storage
34}
35
36// NewFanout returns a new fan-out Storage, which proxies reads and writes
37// through to multiple underlying storages.
38func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Storage {
39	return &fanout{
40		logger:      logger,
41		primary:     primary,
42		secondaries: secondaries,
43	}
44}
45
46// StartTime implements the Storage interface.
47func (f *fanout) StartTime() (int64, error) {
48	// StartTime of a fanout should be the earliest StartTime of all its storages,
49	// both primary and secondaries.
50	firstTime, err := f.primary.StartTime()
51	if err != nil {
52		return int64(model.Latest), err
53	}
54
55	for _, storage := range f.secondaries {
56		t, err := storage.StartTime()
57		if err != nil {
58			return int64(model.Latest), err
59		}
60		if t < firstTime {
61			firstTime = t
62		}
63	}
64	return firstTime, nil
65}
66
67func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
68	queriers := make([]Querier, 0, 1+len(f.secondaries))
69
70	// Add primary querier
71	primaryQuerier, err := f.primary.Querier(ctx, mint, maxt)
72	if err != nil {
73		return nil, err
74	}
75	queriers = append(queriers, primaryQuerier)
76
77	// Add secondary queriers
78	for _, storage := range f.secondaries {
79		querier, err := storage.Querier(ctx, mint, maxt)
80		if err != nil {
81			NewMergeQuerier(primaryQuerier, queriers).Close()
82			return nil, err
83		}
84		queriers = append(queriers, querier)
85	}
86
87	return NewMergeQuerier(primaryQuerier, queriers), nil
88}
89
90func (f *fanout) Appender() (Appender, error) {
91	primary, err := f.primary.Appender()
92	if err != nil {
93		return nil, err
94	}
95
96	secondaries := make([]Appender, 0, len(f.secondaries))
97	for _, storage := range f.secondaries {
98		appender, err := storage.Appender()
99		if err != nil {
100			return nil, err
101		}
102		secondaries = append(secondaries, appender)
103	}
104	return &fanoutAppender{
105		logger:      f.logger,
106		primary:     primary,
107		secondaries: secondaries,
108	}, nil
109}
110
111// Close closes the storage and all its underlying resources.
112func (f *fanout) Close() error {
113	if err := f.primary.Close(); err != nil {
114		return err
115	}
116
117	// TODO return multiple errors?
118	var lastErr error
119	for _, storage := range f.secondaries {
120		if err := storage.Close(); err != nil {
121			lastErr = err
122		}
123	}
124	return lastErr
125}
126
127// fanoutAppender implements Appender.
128type fanoutAppender struct {
129	logger log.Logger
130
131	primary     Appender
132	secondaries []Appender
133}
134
135func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
136	ref, err := f.primary.Add(l, t, v)
137	if err != nil {
138		return ref, err
139	}
140
141	for _, appender := range f.secondaries {
142		if _, err := appender.Add(l, t, v); err != nil {
143			return 0, err
144		}
145	}
146	return ref, nil
147}
148
149func (f *fanoutAppender) AddFast(l labels.Labels, ref uint64, t int64, v float64) error {
150	if err := f.primary.AddFast(l, ref, t, v); err != nil {
151		return err
152	}
153
154	for _, appender := range f.secondaries {
155		if _, err := appender.Add(l, t, v); err != nil {
156			return err
157		}
158	}
159	return nil
160}
161
162func (f *fanoutAppender) Commit() (err error) {
163	err = f.primary.Commit()
164
165	for _, appender := range f.secondaries {
166		if err == nil {
167			err = appender.Commit()
168		} else {
169			if rollbackErr := appender.Rollback(); rollbackErr != nil {
170				level.Error(f.logger).Log("msg", "Squashed rollback error on commit", "err", rollbackErr)
171			}
172		}
173	}
174	return
175}
176
177func (f *fanoutAppender) Rollback() (err error) {
178	err = f.primary.Rollback()
179
180	for _, appender := range f.secondaries {
181		rollbackErr := appender.Rollback()
182		if err == nil {
183			err = rollbackErr
184		} else if rollbackErr != nil {
185			level.Error(f.logger).Log("msg", "Squashed rollback error on rollback", "err", rollbackErr)
186		}
187	}
188	return nil
189}
190
191// mergeQuerier implements Querier.
192type mergeQuerier struct {
193	primaryQuerier Querier
194	queriers       []Querier
195
196	failedQueriers map[Querier]struct{}
197	setQuerierMap  map[SeriesSet]Querier
198}
199
200// NewMergeQuerier returns a new Querier that merges results of input queriers.
201// NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it,
202// and will filter NoopQueriers from its arguments, in order to reduce overhead
203// when only one querier is passed.
204func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
205	filtered := make([]Querier, 0, len(queriers))
206	for _, querier := range queriers {
207		if querier != NoopQuerier() {
208			filtered = append(filtered, querier)
209		}
210	}
211
212	setQuerierMap := make(map[SeriesSet]Querier)
213	failedQueriers := make(map[Querier]struct{})
214
215	switch len(filtered) {
216	case 0:
217		return NoopQuerier()
218	case 1:
219		return filtered[0]
220	default:
221		return &mergeQuerier{
222			primaryQuerier: primaryQuerier,
223			queriers:       filtered,
224			failedQueriers: failedQueriers,
225			setQuerierMap:  setQuerierMap,
226		}
227	}
228}
229
230// Select returns a set of series that matches the given label matchers.
231func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
232	seriesSets := make([]SeriesSet, 0, len(q.queriers))
233	var warnings Warnings
234	for _, querier := range q.queriers {
235		set, wrn, err := querier.Select(params, matchers...)
236		q.setQuerierMap[set] = querier
237		if wrn != nil {
238			warnings = append(warnings, wrn...)
239		}
240		if err != nil {
241			q.failedQueriers[querier] = struct{}{}
242			// If the error source isn't the primary querier, return the error as a warning and continue.
243			if querier != q.primaryQuerier {
244				warnings = append(warnings, err)
245				continue
246			} else {
247				return nil, nil, err
248			}
249		}
250		seriesSets = append(seriesSets, set)
251	}
252	return NewMergeSeriesSet(seriesSets, q), warnings, nil
253}
254
255// LabelValues returns all potential values for a label name.
256func (q *mergeQuerier) LabelValues(name string) ([]string, Warnings, error) {
257	var results [][]string
258	var warnings Warnings
259	for _, querier := range q.queriers {
260		values, wrn, err := querier.LabelValues(name)
261
262		if wrn != nil {
263			warnings = append(warnings, wrn...)
264		}
265		if err != nil {
266			q.failedQueriers[querier] = struct{}{}
267			// If the error source isn't the primary querier, return the error as a warning and continue.
268			if querier != q.primaryQuerier {
269				warnings = append(warnings, err)
270				continue
271			} else {
272				return nil, nil, err
273			}
274		}
275		results = append(results, values)
276	}
277	return mergeStringSlices(results), warnings, nil
278}
279
280func (q *mergeQuerier) IsFailedSet(set SeriesSet) bool {
281	_, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]]
282	return isFailedQuerier
283}
284
285func mergeStringSlices(ss [][]string) []string {
286	switch len(ss) {
287	case 0:
288		return nil
289	case 1:
290		return ss[0]
291	case 2:
292		return mergeTwoStringSlices(ss[0], ss[1])
293	default:
294		halfway := len(ss) / 2
295		return mergeTwoStringSlices(
296			mergeStringSlices(ss[:halfway]),
297			mergeStringSlices(ss[halfway:]),
298		)
299	}
300}
301
302func mergeTwoStringSlices(a, b []string) []string {
303	i, j := 0, 0
304	result := make([]string, 0, len(a)+len(b))
305	for i < len(a) && j < len(b) {
306		switch strings.Compare(a[i], b[j]) {
307		case 0:
308			result = append(result, a[i])
309			i++
310			j++
311		case -1:
312			result = append(result, a[i])
313			i++
314		case 1:
315			result = append(result, b[j])
316			j++
317		}
318	}
319	result = append(result, a[i:]...)
320	result = append(result, b[j:]...)
321	return result
322}
323
324// LabelNames returns all the unique label names present in the block in sorted order.
325func (q *mergeQuerier) LabelNames() ([]string, Warnings, error) {
326	labelNamesMap := make(map[string]struct{})
327	var warnings Warnings
328	for _, b := range q.queriers {
329		names, wrn, err := b.LabelNames()
330		if wrn != nil {
331			warnings = append(warnings, wrn...)
332		}
333
334		if err != nil {
335			// If the error source isn't the primary querier, return the error as a warning and continue.
336			if b != q.primaryQuerier {
337				warnings = append(warnings, err)
338				continue
339			} else {
340				return nil, nil, errors.Wrap(err, "LabelNames() from Querier")
341			}
342		}
343
344		for _, name := range names {
345			labelNamesMap[name] = struct{}{}
346		}
347	}
348
349	labelNames := make([]string, 0, len(labelNamesMap))
350	for name := range labelNamesMap {
351		labelNames = append(labelNames, name)
352	}
353	sort.Strings(labelNames)
354
355	return labelNames, warnings, nil
356}
357
358// Close releases the resources of the Querier.
359func (q *mergeQuerier) Close() error {
360	// TODO return multiple errors?
361	var lastErr error
362	for _, querier := range q.queriers {
363		if err := querier.Close(); err != nil {
364			lastErr = err
365		}
366	}
367	return lastErr
368}
369
370// mergeSeriesSet implements SeriesSet
371type mergeSeriesSet struct {
372	currentLabels labels.Labels
373	currentSets   []SeriesSet
374	heap          seriesSetHeap
375	sets          []SeriesSet
376
377	querier *mergeQuerier
378}
379
380// NewMergeSeriesSet returns a new series set that merges (deduplicates)
381// series returned by the input series sets when iterating.
382func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet {
383	if len(sets) == 1 {
384		return sets[0]
385	}
386
387	// Sets need to be pre-advanced, so we can introspect the label of the
388	// series under the cursor.
389	var h seriesSetHeap
390	for _, set := range sets {
391		if set == nil {
392			continue
393		}
394		if set.Next() {
395			heap.Push(&h, set)
396		}
397	}
398	return &mergeSeriesSet{
399		heap:    h,
400		sets:    sets,
401		querier: querier,
402	}
403}
404
405func (c *mergeSeriesSet) Next() bool {
406	// Run in a loop because the "next" series sets may not be valid anymore.
407	// If a remote querier fails, we discard all series sets from that querier.
408	// If, for the current label set, all the next series sets come from
409	// failed remote storage sources, we want to keep trying with the next label set.
410	for {
411		// Firstly advance all the current series sets.  If any of them have run out
412		// we can drop them, otherwise they should be inserted back into the heap.
413		for _, set := range c.currentSets {
414			if set.Next() {
415				heap.Push(&c.heap, set)
416			}
417		}
418		if len(c.heap) == 0 {
419			return false
420		}
421
422		// Now, pop items of the heap that have equal label sets.
423		c.currentSets = nil
424		c.currentLabels = c.heap[0].At().Labels()
425		for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
426			set := heap.Pop(&c.heap).(SeriesSet)
427			if c.querier != nil && c.querier.IsFailedSet(set) {
428				continue
429			}
430			c.currentSets = append(c.currentSets, set)
431		}
432
433		// As long as the current set contains at least 1 set,
434		// then it should return true.
435		if len(c.currentSets) != 0 {
436			break
437		}
438	}
439	return true
440}
441
442func (c *mergeSeriesSet) At() Series {
443	if len(c.currentSets) == 1 {
444		return c.currentSets[0].At()
445	}
446	series := []Series{}
447	for _, seriesSet := range c.currentSets {
448		series = append(series, seriesSet.At())
449	}
450	return &mergeSeries{
451		labels: c.currentLabels,
452		series: series,
453	}
454}
455
456func (c *mergeSeriesSet) Err() error {
457	for _, set := range c.sets {
458		if err := set.Err(); err != nil {
459			return err
460		}
461	}
462	return nil
463}
464
465type seriesSetHeap []SeriesSet
466
467func (h seriesSetHeap) Len() int      { return len(h) }
468func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
469
470func (h seriesSetHeap) Less(i, j int) bool {
471	a, b := h[i].At().Labels(), h[j].At().Labels()
472	return labels.Compare(a, b) < 0
473}
474
475func (h *seriesSetHeap) Push(x interface{}) {
476	*h = append(*h, x.(SeriesSet))
477}
478
479func (h *seriesSetHeap) Pop() interface{} {
480	old := *h
481	n := len(old)
482	x := old[n-1]
483	*h = old[0 : n-1]
484	return x
485}
486
487type mergeSeries struct {
488	labels labels.Labels
489	series []Series
490}
491
492func (m *mergeSeries) Labels() labels.Labels {
493	return m.labels
494}
495
496func (m *mergeSeries) Iterator() SeriesIterator {
497	iterators := make([]SeriesIterator, 0, len(m.series))
498	for _, s := range m.series {
499		iterators = append(iterators, s.Iterator())
500	}
501	return newMergeIterator(iterators)
502}
503
504type mergeIterator struct {
505	iterators []SeriesIterator
506	h         seriesIteratorHeap
507}
508
509func newMergeIterator(iterators []SeriesIterator) SeriesIterator {
510	return &mergeIterator{
511		iterators: iterators,
512		h:         nil,
513	}
514}
515
516func (c *mergeIterator) Seek(t int64) bool {
517	c.h = seriesIteratorHeap{}
518	for _, iter := range c.iterators {
519		if iter.Seek(t) {
520			heap.Push(&c.h, iter)
521		}
522	}
523	return len(c.h) > 0
524}
525
526func (c *mergeIterator) At() (t int64, v float64) {
527	if len(c.h) == 0 {
528		panic("mergeIterator.At() called after .Next() returned false.")
529	}
530
531	return c.h[0].At()
532}
533
534func (c *mergeIterator) Next() bool {
535	if c.h == nil {
536		for _, iter := range c.iterators {
537			if iter.Next() {
538				heap.Push(&c.h, iter)
539			}
540		}
541
542		return len(c.h) > 0
543	}
544
545	if len(c.h) == 0 {
546		return false
547	}
548
549	currt, _ := c.At()
550	for len(c.h) > 0 {
551		nextt, _ := c.h[0].At()
552		if nextt != currt {
553			break
554		}
555
556		iter := heap.Pop(&c.h).(SeriesIterator)
557		if iter.Next() {
558			heap.Push(&c.h, iter)
559		}
560	}
561
562	return len(c.h) > 0
563}
564
565func (c *mergeIterator) Err() error {
566	for _, iter := range c.iterators {
567		if err := iter.Err(); err != nil {
568			return err
569		}
570	}
571	return nil
572}
573
574type seriesIteratorHeap []SeriesIterator
575
576func (h seriesIteratorHeap) Len() int      { return len(h) }
577func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
578
579func (h seriesIteratorHeap) Less(i, j int) bool {
580	at, _ := h[i].At()
581	bt, _ := h[j].At()
582	return at < bt
583}
584
585func (h *seriesIteratorHeap) Push(x interface{}) {
586	*h = append(*h, x.(SeriesIterator))
587}
588
589func (h *seriesIteratorHeap) Pop() interface{} {
590	old := *h
591	n := len(old)
592	x := old[n-1]
593	*h = old[0 : n-1]
594	return x
595}
596