1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package index 15 16import ( 17 "container/heap" 18 "encoding/binary" 19 "runtime" 20 "sort" 21 "strings" 22 "sync" 23 24 "github.com/prometheus/prometheus/pkg/labels" 25) 26 27var allPostingsKey = labels.Label{} 28 29// AllPostingsKey returns the label key that is used to store the postings list of all existing IDs. 30func AllPostingsKey() (name, value string) { 31 return allPostingsKey.Name, allPostingsKey.Value 32} 33 34// MemPostings holds postings list for series ID per label pair. They may be written 35// to out of order. 36// ensureOrder() must be called once before any reads are done. This allows for quick 37// unordered batch fills on startup. 38type MemPostings struct { 39 mtx sync.RWMutex 40 m map[string]map[string][]uint64 41 ordered bool 42} 43 44// NewMemPostings returns a memPostings that's ready for reads and writes. 45func NewMemPostings() *MemPostings { 46 return &MemPostings{ 47 m: make(map[string]map[string][]uint64, 512), 48 ordered: true, 49 } 50} 51 52// NewUnorderedMemPostings returns a memPostings that is not safe to be read from 53// until ensureOrder was called once. 54func NewUnorderedMemPostings() *MemPostings { 55 return &MemPostings{ 56 m: make(map[string]map[string][]uint64, 512), 57 ordered: false, 58 } 59} 60 61// Symbols returns an iterator over all unique name and value strings, in order. 62func (p *MemPostings) Symbols() StringIter { 63 p.mtx.RLock() 64 65 // Add all the strings to a map to de-duplicate. 66 symbols := make(map[string]struct{}, 512) 67 for n, e := range p.m { 68 symbols[n] = struct{}{} 69 for v := range e { 70 symbols[v] = struct{}{} 71 } 72 } 73 p.mtx.RUnlock() 74 75 res := make([]string, 0, len(symbols)) 76 for k := range symbols { 77 res = append(res, k) 78 } 79 80 sort.Strings(res) 81 return NewStringListIter(res) 82} 83 84// SortedKeys returns a list of sorted label keys of the postings. 85func (p *MemPostings) SortedKeys() []labels.Label { 86 p.mtx.RLock() 87 keys := make([]labels.Label, 0, len(p.m)) 88 89 for n, e := range p.m { 90 for v := range e { 91 keys = append(keys, labels.Label{Name: n, Value: v}) 92 } 93 } 94 p.mtx.RUnlock() 95 96 sort.Slice(keys, func(i, j int) bool { 97 if d := strings.Compare(keys[i].Name, keys[j].Name); d != 0 { 98 return d < 0 99 } 100 return keys[i].Value < keys[j].Value 101 }) 102 return keys 103} 104 105// LabelNames returns all the unique label names. 106func (p *MemPostings) LabelNames() []string { 107 p.mtx.RLock() 108 defer p.mtx.RUnlock() 109 n := len(p.m) 110 if n == 0 { 111 return nil 112 } 113 114 names := make([]string, 0, n-1) 115 for name := range p.m { 116 if name != allPostingsKey.Name { 117 names = append(names, name) 118 } 119 } 120 return names 121} 122 123// LabelValues returns label values for the given name. 124func (p *MemPostings) LabelValues(name string) []string { 125 p.mtx.RLock() 126 defer p.mtx.RUnlock() 127 128 values := make([]string, 0, len(p.m[name])) 129 for v := range p.m[name] { 130 values = append(values, v) 131 } 132 return values 133} 134 135// PostingsStats contains cardinality based statistics for postings. 136type PostingsStats struct { 137 CardinalityMetricsStats []Stat 138 CardinalityLabelStats []Stat 139 LabelValueStats []Stat 140 LabelValuePairsStats []Stat 141 NumLabelPairs int 142} 143 144// Stats calculates the cardinality statistics from postings. 145func (p *MemPostings) Stats(label string) *PostingsStats { 146 const maxNumOfRecords = 10 147 var size uint64 148 149 p.mtx.RLock() 150 151 metrics := &maxHeap{} 152 labels := &maxHeap{} 153 labelValueLength := &maxHeap{} 154 labelValuePairs := &maxHeap{} 155 numLabelPairs := 0 156 157 metrics.init(maxNumOfRecords) 158 labels.init(maxNumOfRecords) 159 labelValueLength.init(maxNumOfRecords) 160 labelValuePairs.init(maxNumOfRecords) 161 162 for n, e := range p.m { 163 if n == "" { 164 continue 165 } 166 labels.push(Stat{Name: n, Count: uint64(len(e))}) 167 numLabelPairs += len(e) 168 size = 0 169 for name, values := range e { 170 if n == label { 171 metrics.push(Stat{Name: name, Count: uint64(len(values))}) 172 } 173 labelValuePairs.push(Stat{Name: n + "=" + name, Count: uint64(len(values))}) 174 size += uint64(len(name)) 175 } 176 labelValueLength.push(Stat{Name: n, Count: size}) 177 } 178 179 p.mtx.RUnlock() 180 181 return &PostingsStats{ 182 CardinalityMetricsStats: metrics.get(), 183 CardinalityLabelStats: labels.get(), 184 LabelValueStats: labelValueLength.get(), 185 LabelValuePairsStats: labelValuePairs.get(), 186 NumLabelPairs: numLabelPairs, 187 } 188} 189 190// Get returns a postings list for the given label pair. 191func (p *MemPostings) Get(name, value string) Postings { 192 var lp []uint64 193 p.mtx.RLock() 194 l := p.m[name] 195 if l != nil { 196 lp = l[value] 197 } 198 p.mtx.RUnlock() 199 200 if lp == nil { 201 return EmptyPostings() 202 } 203 return newListPostings(lp...) 204} 205 206// All returns a postings list over all documents ever added. 207func (p *MemPostings) All() Postings { 208 return p.Get(AllPostingsKey()) 209} 210 211// EnsureOrder ensures that all postings lists are sorted. After it returns all further 212// calls to add and addFor will insert new IDs in a sorted manner. 213func (p *MemPostings) EnsureOrder() { 214 p.mtx.Lock() 215 defer p.mtx.Unlock() 216 217 if p.ordered { 218 return 219 } 220 221 n := runtime.GOMAXPROCS(0) 222 workc := make(chan []uint64) 223 224 var wg sync.WaitGroup 225 wg.Add(n) 226 227 for i := 0; i < n; i++ { 228 go func() { 229 for l := range workc { 230 sort.Slice(l, func(a, b int) bool { return l[a] < l[b] }) 231 } 232 wg.Done() 233 }() 234 } 235 236 for _, e := range p.m { 237 for _, l := range e { 238 workc <- l 239 } 240 } 241 close(workc) 242 wg.Wait() 243 244 p.ordered = true 245} 246 247// Delete removes all ids in the given map from the postings lists. 248func (p *MemPostings) Delete(deleted map[uint64]struct{}) { 249 var keys, vals []string 250 251 // Collect all keys relevant for deletion once. New keys added afterwards 252 // can by definition not be affected by any of the given deletes. 253 p.mtx.RLock() 254 for n := range p.m { 255 keys = append(keys, n) 256 } 257 p.mtx.RUnlock() 258 259 for _, n := range keys { 260 p.mtx.RLock() 261 vals = vals[:0] 262 for v := range p.m[n] { 263 vals = append(vals, v) 264 } 265 p.mtx.RUnlock() 266 267 // For each posting we first analyse whether the postings list is affected by the deletes. 268 // If yes, we actually reallocate a new postings list. 269 for _, l := range vals { 270 // Only lock for processing one postings list so we don't block reads for too long. 271 p.mtx.Lock() 272 273 found := false 274 for _, id := range p.m[n][l] { 275 if _, ok := deleted[id]; ok { 276 found = true 277 break 278 } 279 } 280 if !found { 281 p.mtx.Unlock() 282 continue 283 } 284 repl := make([]uint64, 0, len(p.m[n][l])) 285 286 for _, id := range p.m[n][l] { 287 if _, ok := deleted[id]; !ok { 288 repl = append(repl, id) 289 } 290 } 291 if len(repl) > 0 { 292 p.m[n][l] = repl 293 } else { 294 delete(p.m[n], l) 295 } 296 p.mtx.Unlock() 297 } 298 p.mtx.Lock() 299 if len(p.m[n]) == 0 { 300 delete(p.m, n) 301 } 302 p.mtx.Unlock() 303 } 304} 305 306// Iter calls f for each postings list. It aborts if f returns an error and returns it. 307func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error { 308 p.mtx.RLock() 309 defer p.mtx.RUnlock() 310 311 for n, e := range p.m { 312 for v, p := range e { 313 if err := f(labels.Label{Name: n, Value: v}, newListPostings(p...)); err != nil { 314 return err 315 } 316 } 317 } 318 return nil 319} 320 321// Add a label set to the postings index. 322func (p *MemPostings) Add(id uint64, lset labels.Labels) { 323 p.mtx.Lock() 324 325 for _, l := range lset { 326 p.addFor(id, l) 327 } 328 p.addFor(id, allPostingsKey) 329 330 p.mtx.Unlock() 331} 332 333func (p *MemPostings) addFor(id uint64, l labels.Label) { 334 nm, ok := p.m[l.Name] 335 if !ok { 336 nm = map[string][]uint64{} 337 p.m[l.Name] = nm 338 } 339 list := append(nm[l.Value], id) 340 nm[l.Value] = list 341 342 if !p.ordered { 343 return 344 } 345 // There is no guarantee that no higher ID was inserted before as they may 346 // be generated independently before adding them to postings. 347 // We repair order violations on insert. The invariant is that the first n-1 348 // items in the list are already sorted. 349 for i := len(list) - 1; i >= 1; i-- { 350 if list[i] >= list[i-1] { 351 break 352 } 353 list[i], list[i-1] = list[i-1], list[i] 354 } 355} 356 357// ExpandPostings returns the postings expanded as a slice. 358func ExpandPostings(p Postings) (res []uint64, err error) { 359 for p.Next() { 360 res = append(res, p.At()) 361 } 362 return res, p.Err() 363} 364 365// Postings provides iterative access over a postings list. 366type Postings interface { 367 // Next advances the iterator and returns true if another value was found. 368 Next() bool 369 370 // Seek advances the iterator to value v or greater and returns 371 // true if a value was found. 372 Seek(v uint64) bool 373 374 // At returns the value at the current iterator position. 375 At() uint64 376 377 // Err returns the last error of the iterator. 378 Err() error 379} 380 381// errPostings is an empty iterator that always errors. 382type errPostings struct { 383 err error 384} 385 386func (e errPostings) Next() bool { return false } 387func (e errPostings) Seek(uint64) bool { return false } 388func (e errPostings) At() uint64 { return 0 } 389func (e errPostings) Err() error { return e.err } 390 391var emptyPostings = errPostings{} 392 393// EmptyPostings returns a postings list that's always empty. 394// NOTE: Returning EmptyPostings sentinel when index.Postings struct has no postings is recommended. 395// It triggers optimized flow in other functions like Intersect, Without etc. 396func EmptyPostings() Postings { 397 return emptyPostings 398} 399 400// ErrPostings returns new postings that immediately error. 401func ErrPostings(err error) Postings { 402 return errPostings{err} 403} 404 405// Intersect returns a new postings list over the intersection of the 406// input postings. 407func Intersect(its ...Postings) Postings { 408 if len(its) == 0 { 409 return EmptyPostings() 410 } 411 if len(its) == 1 { 412 return its[0] 413 } 414 for _, p := range its { 415 if p == EmptyPostings() { 416 return EmptyPostings() 417 } 418 } 419 420 return newIntersectPostings(its...) 421} 422 423type intersectPostings struct { 424 arr []Postings 425 cur uint64 426} 427 428func newIntersectPostings(its ...Postings) *intersectPostings { 429 return &intersectPostings{arr: its} 430} 431 432func (it *intersectPostings) At() uint64 { 433 return it.cur 434} 435 436func (it *intersectPostings) doNext() bool { 437Loop: 438 for { 439 for _, p := range it.arr { 440 if !p.Seek(it.cur) { 441 return false 442 } 443 if p.At() > it.cur { 444 it.cur = p.At() 445 continue Loop 446 } 447 } 448 return true 449 } 450} 451 452func (it *intersectPostings) Next() bool { 453 for _, p := range it.arr { 454 if !p.Next() { 455 return false 456 } 457 if p.At() > it.cur { 458 it.cur = p.At() 459 } 460 } 461 return it.doNext() 462} 463 464func (it *intersectPostings) Seek(id uint64) bool { 465 it.cur = id 466 return it.doNext() 467} 468 469func (it *intersectPostings) Err() error { 470 for _, p := range it.arr { 471 if p.Err() != nil { 472 return p.Err() 473 } 474 } 475 return nil 476} 477 478// Merge returns a new iterator over the union of the input iterators. 479func Merge(its ...Postings) Postings { 480 if len(its) == 0 { 481 return EmptyPostings() 482 } 483 if len(its) == 1 { 484 return its[0] 485 } 486 487 p, ok := newMergedPostings(its) 488 if !ok { 489 return EmptyPostings() 490 } 491 return p 492} 493 494type postingsHeap []Postings 495 496func (h postingsHeap) Len() int { return len(h) } 497func (h postingsHeap) Less(i, j int) bool { return h[i].At() < h[j].At() } 498func (h *postingsHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } 499 500func (h *postingsHeap) Push(x interface{}) { 501 *h = append(*h, x.(Postings)) 502} 503 504func (h *postingsHeap) Pop() interface{} { 505 old := *h 506 n := len(old) 507 x := old[n-1] 508 *h = old[0 : n-1] 509 return x 510} 511 512type mergedPostings struct { 513 h postingsHeap 514 initialized bool 515 cur uint64 516 err error 517} 518 519func newMergedPostings(p []Postings) (m *mergedPostings, nonEmpty bool) { 520 ph := make(postingsHeap, 0, len(p)) 521 522 for _, it := range p { 523 // NOTE: mergedPostings struct requires the user to issue an initial Next. 524 if it.Next() { 525 ph = append(ph, it) 526 } else { 527 if it.Err() != nil { 528 return &mergedPostings{err: it.Err()}, true 529 } 530 } 531 } 532 533 if len(ph) == 0 { 534 return nil, false 535 } 536 return &mergedPostings{h: ph}, true 537} 538 539func (it *mergedPostings) Next() bool { 540 if it.h.Len() == 0 || it.err != nil { 541 return false 542 } 543 544 // The user must issue an initial Next. 545 if !it.initialized { 546 heap.Init(&it.h) 547 it.cur = it.h[0].At() 548 it.initialized = true 549 return true 550 } 551 552 for { 553 cur := it.h[0] 554 if !cur.Next() { 555 heap.Pop(&it.h) 556 if cur.Err() != nil { 557 it.err = cur.Err() 558 return false 559 } 560 if it.h.Len() == 0 { 561 return false 562 } 563 } else { 564 // Value of top of heap has changed, re-heapify. 565 heap.Fix(&it.h, 0) 566 } 567 568 if it.h[0].At() != it.cur { 569 it.cur = it.h[0].At() 570 return true 571 } 572 } 573} 574 575func (it *mergedPostings) Seek(id uint64) bool { 576 if it.h.Len() == 0 || it.err != nil { 577 return false 578 } 579 if !it.initialized { 580 if !it.Next() { 581 return false 582 } 583 } 584 for it.cur < id { 585 cur := it.h[0] 586 if !cur.Seek(id) { 587 heap.Pop(&it.h) 588 if cur.Err() != nil { 589 it.err = cur.Err() 590 return false 591 } 592 if it.h.Len() == 0 { 593 return false 594 } 595 } else { 596 // Value of top of heap has changed, re-heapify. 597 heap.Fix(&it.h, 0) 598 } 599 600 it.cur = it.h[0].At() 601 } 602 return true 603} 604 605func (it mergedPostings) At() uint64 { 606 return it.cur 607} 608 609func (it mergedPostings) Err() error { 610 return it.err 611} 612 613// Without returns a new postings list that contains all elements from the full list that 614// are not in the drop list. 615func Without(full, drop Postings) Postings { 616 if full == EmptyPostings() { 617 return EmptyPostings() 618 } 619 620 if drop == EmptyPostings() { 621 return full 622 } 623 return newRemovedPostings(full, drop) 624} 625 626type removedPostings struct { 627 full, remove Postings 628 629 cur uint64 630 631 initialized bool 632 fok, rok bool 633} 634 635func newRemovedPostings(full, remove Postings) *removedPostings { 636 return &removedPostings{ 637 full: full, 638 remove: remove, 639 } 640} 641 642func (rp *removedPostings) At() uint64 { 643 return rp.cur 644} 645 646func (rp *removedPostings) Next() bool { 647 if !rp.initialized { 648 rp.fok = rp.full.Next() 649 rp.rok = rp.remove.Next() 650 rp.initialized = true 651 } 652 for { 653 if !rp.fok { 654 return false 655 } 656 657 if !rp.rok { 658 rp.cur = rp.full.At() 659 rp.fok = rp.full.Next() 660 return true 661 } 662 663 fcur, rcur := rp.full.At(), rp.remove.At() 664 if fcur < rcur { 665 rp.cur = fcur 666 rp.fok = rp.full.Next() 667 668 return true 669 } else if rcur < fcur { 670 // Forward the remove postings to the right position. 671 rp.rok = rp.remove.Seek(fcur) 672 } else { 673 // Skip the current posting. 674 rp.fok = rp.full.Next() 675 } 676 } 677} 678 679func (rp *removedPostings) Seek(id uint64) bool { 680 if rp.cur >= id { 681 return true 682 } 683 684 rp.fok = rp.full.Seek(id) 685 rp.rok = rp.remove.Seek(id) 686 rp.initialized = true 687 688 return rp.Next() 689} 690 691func (rp *removedPostings) Err() error { 692 if rp.full.Err() != nil { 693 return rp.full.Err() 694 } 695 696 return rp.remove.Err() 697} 698 699// ListPostings implements the Postings interface over a plain list. 700type ListPostings struct { 701 list []uint64 702 cur uint64 703} 704 705func NewListPostings(list []uint64) Postings { 706 return newListPostings(list...) 707} 708 709func newListPostings(list ...uint64) *ListPostings { 710 return &ListPostings{list: list} 711} 712 713func (it *ListPostings) At() uint64 { 714 return it.cur 715} 716 717func (it *ListPostings) Next() bool { 718 if len(it.list) > 0 { 719 it.cur = it.list[0] 720 it.list = it.list[1:] 721 return true 722 } 723 it.cur = 0 724 return false 725} 726 727func (it *ListPostings) Seek(x uint64) bool { 728 // If the current value satisfies, then return. 729 if it.cur >= x { 730 return true 731 } 732 if len(it.list) == 0 { 733 return false 734 } 735 736 // Do binary search between current position and end. 737 i := sort.Search(len(it.list), func(i int) bool { 738 return it.list[i] >= x 739 }) 740 if i < len(it.list) { 741 it.cur = it.list[i] 742 it.list = it.list[i+1:] 743 return true 744 } 745 it.list = nil 746 return false 747} 748 749func (it *ListPostings) Err() error { 750 return nil 751} 752 753// bigEndianPostings implements the Postings interface over a byte stream of 754// big endian numbers. 755type bigEndianPostings struct { 756 list []byte 757 cur uint32 758} 759 760func newBigEndianPostings(list []byte) *bigEndianPostings { 761 return &bigEndianPostings{list: list} 762} 763 764func (it *bigEndianPostings) At() uint64 { 765 return uint64(it.cur) 766} 767 768func (it *bigEndianPostings) Next() bool { 769 if len(it.list) >= 4 { 770 it.cur = binary.BigEndian.Uint32(it.list) 771 it.list = it.list[4:] 772 return true 773 } 774 return false 775} 776 777func (it *bigEndianPostings) Seek(x uint64) bool { 778 if uint64(it.cur) >= x { 779 return true 780 } 781 782 num := len(it.list) / 4 783 // Do binary search between current position and end. 784 i := sort.Search(num, func(i int) bool { 785 return binary.BigEndian.Uint32(it.list[i*4:]) >= uint32(x) 786 }) 787 if i < num { 788 j := i * 4 789 it.cur = binary.BigEndian.Uint32(it.list[j:]) 790 it.list = it.list[j+4:] 791 return true 792 } 793 it.list = nil 794 return false 795} 796 797func (it *bigEndianPostings) Err() error { 798 return nil 799} 800