1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package index
15
16import (
17	"container/heap"
18	"encoding/binary"
19	"runtime"
20	"sort"
21	"strings"
22	"sync"
23
24	"github.com/prometheus/prometheus/pkg/labels"
25)
26
27var allPostingsKey = labels.Label{}
28
29// AllPostingsKey returns the label key that is used to store the postings list of all existing IDs.
30func AllPostingsKey() (name, value string) {
31	return allPostingsKey.Name, allPostingsKey.Value
32}
33
34// MemPostings holds postings list for series ID per label pair. They may be written
35// to out of order.
36// ensureOrder() must be called once before any reads are done. This allows for quick
37// unordered batch fills on startup.
38type MemPostings struct {
39	mtx     sync.RWMutex
40	m       map[string]map[string][]uint64
41	ordered bool
42}
43
44// NewMemPostings returns a memPostings that's ready for reads and writes.
45func NewMemPostings() *MemPostings {
46	return &MemPostings{
47		m:       make(map[string]map[string][]uint64, 512),
48		ordered: true,
49	}
50}
51
52// NewUnorderedMemPostings returns a memPostings that is not safe to be read from
53// until ensureOrder was called once.
54func NewUnorderedMemPostings() *MemPostings {
55	return &MemPostings{
56		m:       make(map[string]map[string][]uint64, 512),
57		ordered: false,
58	}
59}
60
61// Symbols returns an iterator over all unique name and value strings, in order.
62func (p *MemPostings) Symbols() StringIter {
63	p.mtx.RLock()
64
65	// Add all the strings to a map to de-duplicate.
66	symbols := make(map[string]struct{}, 512)
67	for n, e := range p.m {
68		symbols[n] = struct{}{}
69		for v := range e {
70			symbols[v] = struct{}{}
71		}
72	}
73	p.mtx.RUnlock()
74
75	res := make([]string, 0, len(symbols))
76	for k := range symbols {
77		res = append(res, k)
78	}
79
80	sort.Strings(res)
81	return NewStringListIter(res)
82}
83
84// SortedKeys returns a list of sorted label keys of the postings.
85func (p *MemPostings) SortedKeys() []labels.Label {
86	p.mtx.RLock()
87	keys := make([]labels.Label, 0, len(p.m))
88
89	for n, e := range p.m {
90		for v := range e {
91			keys = append(keys, labels.Label{Name: n, Value: v})
92		}
93	}
94	p.mtx.RUnlock()
95
96	sort.Slice(keys, func(i, j int) bool {
97		if d := strings.Compare(keys[i].Name, keys[j].Name); d != 0 {
98			return d < 0
99		}
100		return keys[i].Value < keys[j].Value
101	})
102	return keys
103}
104
105// LabelNames returns all the unique label names.
106func (p *MemPostings) LabelNames() []string {
107	p.mtx.RLock()
108	defer p.mtx.RUnlock()
109	n := len(p.m)
110	if n == 0 {
111		return nil
112	}
113
114	names := make([]string, 0, n-1)
115	for name := range p.m {
116		if name != allPostingsKey.Name {
117			names = append(names, name)
118		}
119	}
120	return names
121}
122
123// LabelValues returns label values for the given name.
124func (p *MemPostings) LabelValues(name string) []string {
125	p.mtx.RLock()
126	defer p.mtx.RUnlock()
127
128	values := make([]string, 0, len(p.m[name]))
129	for v := range p.m[name] {
130		values = append(values, v)
131	}
132	return values
133}
134
135// PostingsStats contains cardinality based statistics for postings.
136type PostingsStats struct {
137	CardinalityMetricsStats []Stat
138	CardinalityLabelStats   []Stat
139	LabelValueStats         []Stat
140	LabelValuePairsStats    []Stat
141	NumLabelPairs           int
142}
143
144// Stats calculates the cardinality statistics from postings.
145func (p *MemPostings) Stats(label string) *PostingsStats {
146	const maxNumOfRecords = 10
147	var size uint64
148
149	p.mtx.RLock()
150
151	metrics := &maxHeap{}
152	labels := &maxHeap{}
153	labelValueLength := &maxHeap{}
154	labelValuePairs := &maxHeap{}
155	numLabelPairs := 0
156
157	metrics.init(maxNumOfRecords)
158	labels.init(maxNumOfRecords)
159	labelValueLength.init(maxNumOfRecords)
160	labelValuePairs.init(maxNumOfRecords)
161
162	for n, e := range p.m {
163		if n == "" {
164			continue
165		}
166		labels.push(Stat{Name: n, Count: uint64(len(e))})
167		numLabelPairs += len(e)
168		size = 0
169		for name, values := range e {
170			if n == label {
171				metrics.push(Stat{Name: name, Count: uint64(len(values))})
172			}
173			labelValuePairs.push(Stat{Name: n + "=" + name, Count: uint64(len(values))})
174			size += uint64(len(name))
175		}
176		labelValueLength.push(Stat{Name: n, Count: size})
177	}
178
179	p.mtx.RUnlock()
180
181	return &PostingsStats{
182		CardinalityMetricsStats: metrics.get(),
183		CardinalityLabelStats:   labels.get(),
184		LabelValueStats:         labelValueLength.get(),
185		LabelValuePairsStats:    labelValuePairs.get(),
186		NumLabelPairs:           numLabelPairs,
187	}
188}
189
190// Get returns a postings list for the given label pair.
191func (p *MemPostings) Get(name, value string) Postings {
192	var lp []uint64
193	p.mtx.RLock()
194	l := p.m[name]
195	if l != nil {
196		lp = l[value]
197	}
198	p.mtx.RUnlock()
199
200	if lp == nil {
201		return EmptyPostings()
202	}
203	return newListPostings(lp...)
204}
205
206// All returns a postings list over all documents ever added.
207func (p *MemPostings) All() Postings {
208	return p.Get(AllPostingsKey())
209}
210
211// EnsureOrder ensures that all postings lists are sorted. After it returns all further
212// calls to add and addFor will insert new IDs in a sorted manner.
213func (p *MemPostings) EnsureOrder() {
214	p.mtx.Lock()
215	defer p.mtx.Unlock()
216
217	if p.ordered {
218		return
219	}
220
221	n := runtime.GOMAXPROCS(0)
222	workc := make(chan []uint64)
223
224	var wg sync.WaitGroup
225	wg.Add(n)
226
227	for i := 0; i < n; i++ {
228		go func() {
229			for l := range workc {
230				sort.Slice(l, func(a, b int) bool { return l[a] < l[b] })
231			}
232			wg.Done()
233		}()
234	}
235
236	for _, e := range p.m {
237		for _, l := range e {
238			workc <- l
239		}
240	}
241	close(workc)
242	wg.Wait()
243
244	p.ordered = true
245}
246
247// Delete removes all ids in the given map from the postings lists.
248func (p *MemPostings) Delete(deleted map[uint64]struct{}) {
249	var keys, vals []string
250
251	// Collect all keys relevant for deletion once. New keys added afterwards
252	// can by definition not be affected by any of the given deletes.
253	p.mtx.RLock()
254	for n := range p.m {
255		keys = append(keys, n)
256	}
257	p.mtx.RUnlock()
258
259	for _, n := range keys {
260		p.mtx.RLock()
261		vals = vals[:0]
262		for v := range p.m[n] {
263			vals = append(vals, v)
264		}
265		p.mtx.RUnlock()
266
267		// For each posting we first analyse whether the postings list is affected by the deletes.
268		// If yes, we actually reallocate a new postings list.
269		for _, l := range vals {
270			// Only lock for processing one postings list so we don't block reads for too long.
271			p.mtx.Lock()
272
273			found := false
274			for _, id := range p.m[n][l] {
275				if _, ok := deleted[id]; ok {
276					found = true
277					break
278				}
279			}
280			if !found {
281				p.mtx.Unlock()
282				continue
283			}
284			repl := make([]uint64, 0, len(p.m[n][l]))
285
286			for _, id := range p.m[n][l] {
287				if _, ok := deleted[id]; !ok {
288					repl = append(repl, id)
289				}
290			}
291			if len(repl) > 0 {
292				p.m[n][l] = repl
293			} else {
294				delete(p.m[n], l)
295			}
296			p.mtx.Unlock()
297		}
298		p.mtx.Lock()
299		if len(p.m[n]) == 0 {
300			delete(p.m, n)
301		}
302		p.mtx.Unlock()
303	}
304}
305
306// Iter calls f for each postings list. It aborts if f returns an error and returns it.
307func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error {
308	p.mtx.RLock()
309	defer p.mtx.RUnlock()
310
311	for n, e := range p.m {
312		for v, p := range e {
313			if err := f(labels.Label{Name: n, Value: v}, newListPostings(p...)); err != nil {
314				return err
315			}
316		}
317	}
318	return nil
319}
320
321// Add a label set to the postings index.
322func (p *MemPostings) Add(id uint64, lset labels.Labels) {
323	p.mtx.Lock()
324
325	for _, l := range lset {
326		p.addFor(id, l)
327	}
328	p.addFor(id, allPostingsKey)
329
330	p.mtx.Unlock()
331}
332
333func (p *MemPostings) addFor(id uint64, l labels.Label) {
334	nm, ok := p.m[l.Name]
335	if !ok {
336		nm = map[string][]uint64{}
337		p.m[l.Name] = nm
338	}
339	list := append(nm[l.Value], id)
340	nm[l.Value] = list
341
342	if !p.ordered {
343		return
344	}
345	// There is no guarantee that no higher ID was inserted before as they may
346	// be generated independently before adding them to postings.
347	// We repair order violations on insert. The invariant is that the first n-1
348	// items in the list are already sorted.
349	for i := len(list) - 1; i >= 1; i-- {
350		if list[i] >= list[i-1] {
351			break
352		}
353		list[i], list[i-1] = list[i-1], list[i]
354	}
355}
356
357// ExpandPostings returns the postings expanded as a slice.
358func ExpandPostings(p Postings) (res []uint64, err error) {
359	for p.Next() {
360		res = append(res, p.At())
361	}
362	return res, p.Err()
363}
364
365// Postings provides iterative access over a postings list.
366type Postings interface {
367	// Next advances the iterator and returns true if another value was found.
368	Next() bool
369
370	// Seek advances the iterator to value v or greater and returns
371	// true if a value was found.
372	Seek(v uint64) bool
373
374	// At returns the value at the current iterator position.
375	At() uint64
376
377	// Err returns the last error of the iterator.
378	Err() error
379}
380
381// errPostings is an empty iterator that always errors.
382type errPostings struct {
383	err error
384}
385
386func (e errPostings) Next() bool       { return false }
387func (e errPostings) Seek(uint64) bool { return false }
388func (e errPostings) At() uint64       { return 0 }
389func (e errPostings) Err() error       { return e.err }
390
391var emptyPostings = errPostings{}
392
393// EmptyPostings returns a postings list that's always empty.
394// NOTE: Returning EmptyPostings sentinel when index.Postings struct has no postings is recommended.
395// It triggers optimized flow in other functions like Intersect, Without etc.
396func EmptyPostings() Postings {
397	return emptyPostings
398}
399
400// ErrPostings returns new postings that immediately error.
401func ErrPostings(err error) Postings {
402	return errPostings{err}
403}
404
405// Intersect returns a new postings list over the intersection of the
406// input postings.
407func Intersect(its ...Postings) Postings {
408	if len(its) == 0 {
409		return EmptyPostings()
410	}
411	if len(its) == 1 {
412		return its[0]
413	}
414	for _, p := range its {
415		if p == EmptyPostings() {
416			return EmptyPostings()
417		}
418	}
419
420	return newIntersectPostings(its...)
421}
422
423type intersectPostings struct {
424	arr []Postings
425	cur uint64
426}
427
428func newIntersectPostings(its ...Postings) *intersectPostings {
429	return &intersectPostings{arr: its}
430}
431
432func (it *intersectPostings) At() uint64 {
433	return it.cur
434}
435
436func (it *intersectPostings) doNext() bool {
437Loop:
438	for {
439		for _, p := range it.arr {
440			if !p.Seek(it.cur) {
441				return false
442			}
443			if p.At() > it.cur {
444				it.cur = p.At()
445				continue Loop
446			}
447		}
448		return true
449	}
450}
451
452func (it *intersectPostings) Next() bool {
453	for _, p := range it.arr {
454		if !p.Next() {
455			return false
456		}
457		if p.At() > it.cur {
458			it.cur = p.At()
459		}
460	}
461	return it.doNext()
462}
463
464func (it *intersectPostings) Seek(id uint64) bool {
465	it.cur = id
466	return it.doNext()
467}
468
469func (it *intersectPostings) Err() error {
470	for _, p := range it.arr {
471		if p.Err() != nil {
472			return p.Err()
473		}
474	}
475	return nil
476}
477
478// Merge returns a new iterator over the union of the input iterators.
479func Merge(its ...Postings) Postings {
480	if len(its) == 0 {
481		return EmptyPostings()
482	}
483	if len(its) == 1 {
484		return its[0]
485	}
486
487	p, ok := newMergedPostings(its)
488	if !ok {
489		return EmptyPostings()
490	}
491	return p
492}
493
494type postingsHeap []Postings
495
496func (h postingsHeap) Len() int           { return len(h) }
497func (h postingsHeap) Less(i, j int) bool { return h[i].At() < h[j].At() }
498func (h *postingsHeap) Swap(i, j int)     { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
499
500func (h *postingsHeap) Push(x interface{}) {
501	*h = append(*h, x.(Postings))
502}
503
504func (h *postingsHeap) Pop() interface{} {
505	old := *h
506	n := len(old)
507	x := old[n-1]
508	*h = old[0 : n-1]
509	return x
510}
511
512type mergedPostings struct {
513	h           postingsHeap
514	initialized bool
515	cur         uint64
516	err         error
517}
518
519func newMergedPostings(p []Postings) (m *mergedPostings, nonEmpty bool) {
520	ph := make(postingsHeap, 0, len(p))
521
522	for _, it := range p {
523		// NOTE: mergedPostings struct requires the user to issue an initial Next.
524		if it.Next() {
525			ph = append(ph, it)
526		} else {
527			if it.Err() != nil {
528				return &mergedPostings{err: it.Err()}, true
529			}
530		}
531	}
532
533	if len(ph) == 0 {
534		return nil, false
535	}
536	return &mergedPostings{h: ph}, true
537}
538
539func (it *mergedPostings) Next() bool {
540	if it.h.Len() == 0 || it.err != nil {
541		return false
542	}
543
544	// The user must issue an initial Next.
545	if !it.initialized {
546		heap.Init(&it.h)
547		it.cur = it.h[0].At()
548		it.initialized = true
549		return true
550	}
551
552	for {
553		cur := it.h[0]
554		if !cur.Next() {
555			heap.Pop(&it.h)
556			if cur.Err() != nil {
557				it.err = cur.Err()
558				return false
559			}
560			if it.h.Len() == 0 {
561				return false
562			}
563		} else {
564			// Value of top of heap has changed, re-heapify.
565			heap.Fix(&it.h, 0)
566		}
567
568		if it.h[0].At() != it.cur {
569			it.cur = it.h[0].At()
570			return true
571		}
572	}
573}
574
575func (it *mergedPostings) Seek(id uint64) bool {
576	if it.h.Len() == 0 || it.err != nil {
577		return false
578	}
579	if !it.initialized {
580		if !it.Next() {
581			return false
582		}
583	}
584	for it.cur < id {
585		cur := it.h[0]
586		if !cur.Seek(id) {
587			heap.Pop(&it.h)
588			if cur.Err() != nil {
589				it.err = cur.Err()
590				return false
591			}
592			if it.h.Len() == 0 {
593				return false
594			}
595		} else {
596			// Value of top of heap has changed, re-heapify.
597			heap.Fix(&it.h, 0)
598		}
599
600		it.cur = it.h[0].At()
601	}
602	return true
603}
604
605func (it mergedPostings) At() uint64 {
606	return it.cur
607}
608
609func (it mergedPostings) Err() error {
610	return it.err
611}
612
613// Without returns a new postings list that contains all elements from the full list that
614// are not in the drop list.
615func Without(full, drop Postings) Postings {
616	if full == EmptyPostings() {
617		return EmptyPostings()
618	}
619
620	if drop == EmptyPostings() {
621		return full
622	}
623	return newRemovedPostings(full, drop)
624}
625
626type removedPostings struct {
627	full, remove Postings
628
629	cur uint64
630
631	initialized bool
632	fok, rok    bool
633}
634
635func newRemovedPostings(full, remove Postings) *removedPostings {
636	return &removedPostings{
637		full:   full,
638		remove: remove,
639	}
640}
641
642func (rp *removedPostings) At() uint64 {
643	return rp.cur
644}
645
646func (rp *removedPostings) Next() bool {
647	if !rp.initialized {
648		rp.fok = rp.full.Next()
649		rp.rok = rp.remove.Next()
650		rp.initialized = true
651	}
652	for {
653		if !rp.fok {
654			return false
655		}
656
657		if !rp.rok {
658			rp.cur = rp.full.At()
659			rp.fok = rp.full.Next()
660			return true
661		}
662
663		fcur, rcur := rp.full.At(), rp.remove.At()
664		if fcur < rcur {
665			rp.cur = fcur
666			rp.fok = rp.full.Next()
667
668			return true
669		} else if rcur < fcur {
670			// Forward the remove postings to the right position.
671			rp.rok = rp.remove.Seek(fcur)
672		} else {
673			// Skip the current posting.
674			rp.fok = rp.full.Next()
675		}
676	}
677}
678
679func (rp *removedPostings) Seek(id uint64) bool {
680	if rp.cur >= id {
681		return true
682	}
683
684	rp.fok = rp.full.Seek(id)
685	rp.rok = rp.remove.Seek(id)
686	rp.initialized = true
687
688	return rp.Next()
689}
690
691func (rp *removedPostings) Err() error {
692	if rp.full.Err() != nil {
693		return rp.full.Err()
694	}
695
696	return rp.remove.Err()
697}
698
699// ListPostings implements the Postings interface over a plain list.
700type ListPostings struct {
701	list []uint64
702	cur  uint64
703}
704
705func NewListPostings(list []uint64) Postings {
706	return newListPostings(list...)
707}
708
709func newListPostings(list ...uint64) *ListPostings {
710	return &ListPostings{list: list}
711}
712
713func (it *ListPostings) At() uint64 {
714	return it.cur
715}
716
717func (it *ListPostings) Next() bool {
718	if len(it.list) > 0 {
719		it.cur = it.list[0]
720		it.list = it.list[1:]
721		return true
722	}
723	it.cur = 0
724	return false
725}
726
727func (it *ListPostings) Seek(x uint64) bool {
728	// If the current value satisfies, then return.
729	if it.cur >= x {
730		return true
731	}
732	if len(it.list) == 0 {
733		return false
734	}
735
736	// Do binary search between current position and end.
737	i := sort.Search(len(it.list), func(i int) bool {
738		return it.list[i] >= x
739	})
740	if i < len(it.list) {
741		it.cur = it.list[i]
742		it.list = it.list[i+1:]
743		return true
744	}
745	it.list = nil
746	return false
747}
748
749func (it *ListPostings) Err() error {
750	return nil
751}
752
753// bigEndianPostings implements the Postings interface over a byte stream of
754// big endian numbers.
755type bigEndianPostings struct {
756	list []byte
757	cur  uint32
758}
759
760func newBigEndianPostings(list []byte) *bigEndianPostings {
761	return &bigEndianPostings{list: list}
762}
763
764func (it *bigEndianPostings) At() uint64 {
765	return uint64(it.cur)
766}
767
768func (it *bigEndianPostings) Next() bool {
769	if len(it.list) >= 4 {
770		it.cur = binary.BigEndian.Uint32(it.list)
771		it.list = it.list[4:]
772		return true
773	}
774	return false
775}
776
777func (it *bigEndianPostings) Seek(x uint64) bool {
778	if uint64(it.cur) >= x {
779		return true
780	}
781
782	num := len(it.list) / 4
783	// Do binary search between current position and end.
784	i := sort.Search(num, func(i int) bool {
785		return binary.BigEndian.Uint32(it.list[i*4:]) >= uint32(x)
786	})
787	if i < num {
788		j := i * 4
789		it.cur = binary.BigEndian.Uint32(it.list[j:])
790		it.list = it.list[j+4:]
791		return true
792	}
793	it.list = nil
794	return false
795}
796
797func (it *bigEndianPostings) Err() error {
798	return nil
799}
800