1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package index 15 16import ( 17 "bufio" 18 "bytes" 19 "context" 20 "encoding/binary" 21 "hash" 22 "hash/crc32" 23 "io" 24 "io/ioutil" 25 "math" 26 "os" 27 "path/filepath" 28 "sort" 29 "unsafe" 30 31 "github.com/pkg/errors" 32 33 "github.com/prometheus/prometheus/pkg/labels" 34 "github.com/prometheus/prometheus/storage" 35 "github.com/prometheus/prometheus/tsdb/chunks" 36 "github.com/prometheus/prometheus/tsdb/encoding" 37 tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" 38 "github.com/prometheus/prometheus/tsdb/fileutil" 39) 40 41const ( 42 // MagicIndex 4 bytes at the head of an index file. 43 MagicIndex = 0xBAAAD700 44 // HeaderLen represents number of bytes reserved of index for header. 45 HeaderLen = 5 46 47 // FormatV1 represents 1 version of index. 48 FormatV1 = 1 49 // FormatV2 represents 2 version of index. 50 FormatV2 = 2 51 52 indexFilename = "index" 53) 54 55type indexWriterSeries struct { 56 labels labels.Labels 57 chunks []chunks.Meta // series file offset of chunks 58} 59 60type indexWriterSeriesSlice []*indexWriterSeries 61 62func (s indexWriterSeriesSlice) Len() int { return len(s) } 63func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 64 65func (s indexWriterSeriesSlice) Less(i, j int) bool { 66 return labels.Compare(s[i].labels, s[j].labels) < 0 67} 68 69type indexWriterStage uint8 70 71const ( 72 idxStageNone indexWriterStage = iota 73 idxStageSymbols 74 idxStageSeries 75 idxStageDone 76) 77 78func (s indexWriterStage) String() string { 79 switch s { 80 case idxStageNone: 81 return "none" 82 case idxStageSymbols: 83 return "symbols" 84 case idxStageSeries: 85 return "series" 86 case idxStageDone: 87 return "done" 88 } 89 return "<unknown>" 90} 91 92// The table gets initialized with sync.Once but may still cause a race 93// with any other use of the crc32 package anywhere. Thus we initialize it 94// before. 95var castagnoliTable *crc32.Table 96 97func init() { 98 castagnoliTable = crc32.MakeTable(crc32.Castagnoli) 99} 100 101// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the 102// polynomial may be easily changed in one location at a later time, if necessary. 103func newCRC32() hash.Hash32 { 104 return crc32.New(castagnoliTable) 105} 106 107type symbolCacheEntry struct { 108 index uint32 109 lastValue string 110 lastValueIndex uint32 111} 112 113// Writer implements the IndexWriter interface for the standard 114// serialization format. 115type Writer struct { 116 ctx context.Context 117 118 // For the main index file. 119 f *FileWriter 120 121 // Temporary file for postings. 122 fP *FileWriter 123 // Temporary file for posting offsets table. 124 fPO *FileWriter 125 cntPO uint64 126 127 toc TOC 128 stage indexWriterStage 129 postingsStart uint64 // Due to padding, can differ from TOC entry. 130 131 // Reusable memory. 132 buf1 encoding.Encbuf 133 buf2 encoding.Encbuf 134 135 numSymbols int 136 symbols *Symbols 137 symbolFile *fileutil.MmapFile 138 lastSymbol string 139 symbolCache map[string]symbolCacheEntry 140 141 labelIndexes []labelIndexHashEntry // Label index offsets. 142 labelNames map[string]uint64 // Label names, and their usage. 143 144 // Hold last series to validate that clients insert new series in order. 145 lastSeries labels.Labels 146 lastRef uint64 147 148 crc32 hash.Hash 149 150 Version int 151} 152 153// TOC represents index Table Of Content that states where each section of index starts. 154type TOC struct { 155 Symbols uint64 156 Series uint64 157 LabelIndices uint64 158 LabelIndicesTable uint64 159 Postings uint64 160 PostingsTable uint64 161} 162 163// NewTOCFromByteSlice return parsed TOC from given index byte slice. 164func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { 165 if bs.Len() < indexTOCLen { 166 return nil, encoding.ErrInvalidSize 167 } 168 b := bs.Range(bs.Len()-indexTOCLen, bs.Len()) 169 170 expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) 171 d := encoding.Decbuf{B: b[:len(b)-4]} 172 173 if d.Crc32(castagnoliTable) != expCRC { 174 return nil, errors.Wrap(encoding.ErrInvalidChecksum, "read TOC") 175 } 176 177 if err := d.Err(); err != nil { 178 return nil, err 179 } 180 181 return &TOC{ 182 Symbols: d.Be64(), 183 Series: d.Be64(), 184 LabelIndices: d.Be64(), 185 LabelIndicesTable: d.Be64(), 186 Postings: d.Be64(), 187 PostingsTable: d.Be64(), 188 }, nil 189} 190 191// NewWriter returns a new Writer to the given filename. It serializes data in format version 2. 192func NewWriter(ctx context.Context, fn string) (*Writer, error) { 193 dir := filepath.Dir(fn) 194 195 df, err := fileutil.OpenDir(dir) 196 if err != nil { 197 return nil, err 198 } 199 defer df.Close() // Close for platform windows. 200 201 if err := os.RemoveAll(fn); err != nil { 202 return nil, errors.Wrap(err, "remove any existing index at path") 203 } 204 205 // Main index file we are building. 206 f, err := NewFileWriter(fn) 207 if err != nil { 208 return nil, err 209 } 210 // Temporary file for postings. 211 fP, err := NewFileWriter(fn + "_tmp_p") 212 if err != nil { 213 return nil, err 214 } 215 // Temporary file for posting offset table. 216 fPO, err := NewFileWriter(fn + "_tmp_po") 217 if err != nil { 218 return nil, err 219 } 220 if err := df.Sync(); err != nil { 221 return nil, errors.Wrap(err, "sync dir") 222 } 223 224 iw := &Writer{ 225 ctx: ctx, 226 f: f, 227 fP: fP, 228 fPO: fPO, 229 stage: idxStageNone, 230 231 // Reusable memory. 232 buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, 233 buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, 234 235 symbolCache: make(map[string]symbolCacheEntry, 1<<8), 236 labelNames: make(map[string]uint64, 1<<8), 237 crc32: newCRC32(), 238 } 239 if err := iw.writeMeta(); err != nil { 240 return nil, err 241 } 242 return iw, nil 243} 244 245func (w *Writer) write(bufs ...[]byte) error { 246 return w.f.Write(bufs...) 247} 248 249func (w *Writer) writeAt(buf []byte, pos uint64) error { 250 return w.f.WriteAt(buf, pos) 251} 252 253func (w *Writer) addPadding(size int) error { 254 return w.f.AddPadding(size) 255} 256 257type FileWriter struct { 258 f *os.File 259 fbuf *bufio.Writer 260 pos uint64 261 name string 262} 263 264func NewFileWriter(name string) (*FileWriter, error) { 265 f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666) 266 if err != nil { 267 return nil, err 268 } 269 return &FileWriter{ 270 f: f, 271 fbuf: bufio.NewWriterSize(f, 1<<22), 272 pos: 0, 273 name: name, 274 }, nil 275} 276 277func (fw *FileWriter) Pos() uint64 { 278 return fw.pos 279} 280 281func (fw *FileWriter) Write(bufs ...[]byte) error { 282 for _, b := range bufs { 283 n, err := fw.fbuf.Write(b) 284 fw.pos += uint64(n) 285 if err != nil { 286 return err 287 } 288 // For now the index file must not grow beyond 64GiB. Some of the fixed-sized 289 // offset references in v1 are only 4 bytes large. 290 // Once we move to compressed/varint representations in those areas, this limitation 291 // can be lifted. 292 if fw.pos > 16*math.MaxUint32 { 293 return errors.Errorf("%q exceeding max size of 64GiB", fw.name) 294 } 295 } 296 return nil 297} 298 299func (fw *FileWriter) Flush() error { 300 return fw.fbuf.Flush() 301} 302 303func (fw *FileWriter) WriteAt(buf []byte, pos uint64) error { 304 if err := fw.Flush(); err != nil { 305 return err 306 } 307 _, err := fw.f.WriteAt(buf, int64(pos)) 308 return err 309} 310 311// AddPadding adds zero byte padding until the file size is a multiple size. 312func (fw *FileWriter) AddPadding(size int) error { 313 p := fw.pos % uint64(size) 314 if p == 0 { 315 return nil 316 } 317 p = uint64(size) - p 318 319 if err := fw.Write(make([]byte, p)); err != nil { 320 return errors.Wrap(err, "add padding") 321 } 322 return nil 323} 324 325func (fw *FileWriter) Close() error { 326 if err := fw.Flush(); err != nil { 327 return err 328 } 329 if err := fw.f.Sync(); err != nil { 330 return err 331 } 332 return fw.f.Close() 333} 334 335func (fw *FileWriter) Remove() error { 336 return os.Remove(fw.name) 337} 338 339// ensureStage handles transitions between write stages and ensures that IndexWriter 340// methods are called in an order valid for the implementation. 341func (w *Writer) ensureStage(s indexWriterStage) error { 342 select { 343 case <-w.ctx.Done(): 344 return w.ctx.Err() 345 default: 346 } 347 348 if w.stage == s { 349 return nil 350 } 351 if w.stage < s-1 { 352 // A stage has been skipped. 353 if err := w.ensureStage(s - 1); err != nil { 354 return err 355 } 356 } 357 if w.stage > s { 358 return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) 359 } 360 361 // Mark start of sections in table of contents. 362 switch s { 363 case idxStageSymbols: 364 w.toc.Symbols = w.f.pos 365 if err := w.startSymbols(); err != nil { 366 return err 367 } 368 case idxStageSeries: 369 if err := w.finishSymbols(); err != nil { 370 return err 371 } 372 w.toc.Series = w.f.pos 373 374 case idxStageDone: 375 w.toc.LabelIndices = w.f.pos 376 // LabelIndices generation depends on the posting offset 377 // table produced at this stage. 378 if err := w.writePostingsToTmpFiles(); err != nil { 379 return err 380 } 381 if err := w.writeLabelIndices(); err != nil { 382 return err 383 } 384 385 w.toc.Postings = w.f.pos 386 if err := w.writePostings(); err != nil { 387 return err 388 } 389 390 w.toc.LabelIndicesTable = w.f.pos 391 if err := w.writeLabelIndexesOffsetTable(); err != nil { 392 return err 393 } 394 395 w.toc.PostingsTable = w.f.pos 396 if err := w.writePostingsOffsetTable(); err != nil { 397 return err 398 } 399 if err := w.writeTOC(); err != nil { 400 return err 401 } 402 } 403 404 w.stage = s 405 return nil 406} 407 408func (w *Writer) writeMeta() error { 409 w.buf1.Reset() 410 w.buf1.PutBE32(MagicIndex) 411 w.buf1.PutByte(FormatV2) 412 413 return w.write(w.buf1.Get()) 414} 415 416// AddSeries adds the series one at a time along with its chunks. 417func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error { 418 if err := w.ensureStage(idxStageSeries); err != nil { 419 return err 420 } 421 if labels.Compare(lset, w.lastSeries) <= 0 { 422 return errors.Errorf("out-of-order series added with label set %q", lset) 423 } 424 425 if ref < w.lastRef && len(w.lastSeries) != 0 { 426 return errors.Errorf("series with reference greater than %d already added", ref) 427 } 428 // We add padding to 16 bytes to increase the addressable space we get through 4 byte 429 // series references. 430 if err := w.addPadding(16); err != nil { 431 return errors.Errorf("failed to write padding bytes: %v", err) 432 } 433 434 if w.f.pos%16 != 0 { 435 return errors.Errorf("series write not 16-byte aligned at %d", w.f.pos) 436 } 437 438 w.buf2.Reset() 439 w.buf2.PutUvarint(len(lset)) 440 441 for _, l := range lset { 442 var err error 443 cacheEntry, ok := w.symbolCache[l.Name] 444 nameIndex := cacheEntry.index 445 if !ok { 446 nameIndex, err = w.symbols.ReverseLookup(l.Name) 447 if err != nil { 448 return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err) 449 } 450 } 451 w.labelNames[l.Name]++ 452 w.buf2.PutUvarint32(nameIndex) 453 454 valueIndex := cacheEntry.lastValueIndex 455 if !ok || cacheEntry.lastValue != l.Value { 456 valueIndex, err = w.symbols.ReverseLookup(l.Value) 457 if err != nil { 458 return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err) 459 } 460 w.symbolCache[l.Name] = symbolCacheEntry{ 461 index: nameIndex, 462 lastValue: l.Value, 463 lastValueIndex: valueIndex, 464 } 465 } 466 w.buf2.PutUvarint32(valueIndex) 467 } 468 469 w.buf2.PutUvarint(len(chunks)) 470 471 if len(chunks) > 0 { 472 c := chunks[0] 473 w.buf2.PutVarint64(c.MinTime) 474 w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime)) 475 w.buf2.PutUvarint64(c.Ref) 476 t0 := c.MaxTime 477 ref0 := int64(c.Ref) 478 479 for _, c := range chunks[1:] { 480 w.buf2.PutUvarint64(uint64(c.MinTime - t0)) 481 w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime)) 482 t0 = c.MaxTime 483 484 w.buf2.PutVarint64(int64(c.Ref) - ref0) 485 ref0 = int64(c.Ref) 486 } 487 } 488 489 w.buf1.Reset() 490 w.buf1.PutUvarint(w.buf2.Len()) 491 492 w.buf2.PutHash(w.crc32) 493 494 if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil { 495 return errors.Wrap(err, "write series data") 496 } 497 498 w.lastSeries = append(w.lastSeries[:0], lset...) 499 w.lastRef = ref 500 501 return nil 502} 503 504func (w *Writer) startSymbols() error { 505 // We are at w.toc.Symbols. 506 // Leave 4 bytes of space for the length, and another 4 for the number of symbols 507 // which will both be calculated later. 508 return w.write([]byte("alenblen")) 509} 510 511func (w *Writer) AddSymbol(sym string) error { 512 if err := w.ensureStage(idxStageSymbols); err != nil { 513 return err 514 } 515 if w.numSymbols != 0 && sym <= w.lastSymbol { 516 return errors.Errorf("symbol %q out-of-order", sym) 517 } 518 w.lastSymbol = sym 519 w.numSymbols++ 520 w.buf1.Reset() 521 w.buf1.PutUvarintStr(sym) 522 return w.write(w.buf1.Get()) 523} 524 525func (w *Writer) finishSymbols() error { 526 symbolTableSize := w.f.pos - w.toc.Symbols - 4 527 // The symbol table's <len> part is 4 bytes. So the total symbol table size must be less than or equal to 2^32-1 528 if symbolTableSize > 4294967295 { 529 return errors.Errorf("symbol table size exceeds 4 bytes: %d", symbolTableSize) 530 } 531 532 // Write out the length and symbol count. 533 w.buf1.Reset() 534 w.buf1.PutBE32int(int(symbolTableSize)) 535 w.buf1.PutBE32int(int(w.numSymbols)) 536 if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil { 537 return err 538 } 539 540 hashPos := w.f.pos 541 // Leave space for the hash. We can only calculate it 542 // now that the number of symbols is known, so mmap and do it from there. 543 if err := w.write([]byte("hash")); err != nil { 544 return err 545 } 546 if err := w.f.Flush(); err != nil { 547 return err 548 } 549 550 sf, err := fileutil.OpenMmapFile(w.f.name) 551 if err != nil { 552 return err 553 } 554 w.symbolFile = sf 555 hash := crc32.Checksum(w.symbolFile.Bytes()[w.toc.Symbols+4:hashPos], castagnoliTable) 556 w.buf1.Reset() 557 w.buf1.PutBE32(hash) 558 if err := w.writeAt(w.buf1.Get(), hashPos); err != nil { 559 return err 560 } 561 562 // Load in the symbol table efficiently for the rest of the index writing. 563 w.symbols, err = NewSymbols(realByteSlice(w.symbolFile.Bytes()), FormatV2, int(w.toc.Symbols)) 564 if err != nil { 565 return errors.Wrap(err, "read symbols") 566 } 567 return nil 568} 569 570func (w *Writer) writeLabelIndices() error { 571 if err := w.fPO.Flush(); err != nil { 572 return err 573 } 574 575 // Find all the label values in the tmp posting offset table. 576 f, err := fileutil.OpenMmapFile(w.fPO.name) 577 if err != nil { 578 return err 579 } 580 defer f.Close() 581 582 d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos)) 583 cnt := w.cntPO 584 current := []byte{} 585 values := []uint32{} 586 for d.Err() == nil && cnt > 0 { 587 cnt-- 588 d.Uvarint() // Keycount. 589 name := d.UvarintBytes() // Label name. 590 value := yoloString(d.UvarintBytes()) // Label value. 591 d.Uvarint64() // Offset. 592 if len(name) == 0 { 593 continue // All index is ignored. 594 } 595 596 if !bytes.Equal(name, current) && len(values) > 0 { 597 // We've reached a new label name. 598 if err := w.writeLabelIndex(string(current), values); err != nil { 599 return err 600 } 601 values = values[:0] 602 } 603 current = name 604 sid, err := w.symbols.ReverseLookup(value) 605 if err != nil { 606 return err 607 } 608 values = append(values, sid) 609 } 610 if d.Err() != nil { 611 return d.Err() 612 } 613 614 // Handle the last label. 615 if len(values) > 0 { 616 if err := w.writeLabelIndex(string(current), values); err != nil { 617 return err 618 } 619 } 620 return nil 621} 622 623func (w *Writer) writeLabelIndex(name string, values []uint32) error { 624 // Align beginning to 4 bytes for more efficient index list scans. 625 if err := w.addPadding(4); err != nil { 626 return err 627 } 628 629 w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{ 630 keys: []string{name}, 631 offset: w.f.pos, 632 }) 633 634 startPos := w.f.pos 635 // Leave 4 bytes of space for the length, which will be calculated later. 636 if err := w.write([]byte("alen")); err != nil { 637 return err 638 } 639 w.crc32.Reset() 640 641 w.buf1.Reset() 642 w.buf1.PutBE32int(1) // Number of names. 643 w.buf1.PutBE32int(len(values)) 644 w.buf1.WriteToHash(w.crc32) 645 if err := w.write(w.buf1.Get()); err != nil { 646 return err 647 } 648 649 for _, v := range values { 650 w.buf1.Reset() 651 w.buf1.PutBE32(v) 652 w.buf1.WriteToHash(w.crc32) 653 if err := w.write(w.buf1.Get()); err != nil { 654 return err 655 } 656 } 657 658 // Write out the length. 659 w.buf1.Reset() 660 w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) 661 if err := w.writeAt(w.buf1.Get(), startPos); err != nil { 662 return err 663 } 664 665 w.buf1.Reset() 666 w.buf1.PutHashSum(w.crc32) 667 return w.write(w.buf1.Get()) 668} 669 670// writeLabelIndexesOffsetTable writes the label indices offset table. 671func (w *Writer) writeLabelIndexesOffsetTable() error { 672 startPos := w.f.pos 673 // Leave 4 bytes of space for the length, which will be calculated later. 674 if err := w.write([]byte("alen")); err != nil { 675 return err 676 } 677 w.crc32.Reset() 678 679 w.buf1.Reset() 680 w.buf1.PutBE32int(len(w.labelIndexes)) 681 w.buf1.WriteToHash(w.crc32) 682 if err := w.write(w.buf1.Get()); err != nil { 683 return err 684 } 685 686 for _, e := range w.labelIndexes { 687 w.buf1.Reset() 688 w.buf1.PutUvarint(len(e.keys)) 689 for _, k := range e.keys { 690 w.buf1.PutUvarintStr(k) 691 } 692 w.buf1.PutUvarint64(e.offset) 693 w.buf1.WriteToHash(w.crc32) 694 if err := w.write(w.buf1.Get()); err != nil { 695 return err 696 } 697 } 698 // Write out the length. 699 w.buf1.Reset() 700 w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) 701 if err := w.writeAt(w.buf1.Get(), startPos); err != nil { 702 return err 703 } 704 705 w.buf1.Reset() 706 w.buf1.PutHashSum(w.crc32) 707 return w.write(w.buf1.Get()) 708} 709 710// writePostingsOffsetTable writes the postings offset table. 711func (w *Writer) writePostingsOffsetTable() error { 712 // Ensure everything is in the temporary file. 713 if err := w.fPO.Flush(); err != nil { 714 return err 715 } 716 717 startPos := w.f.pos 718 // Leave 4 bytes of space for the length, which will be calculated later. 719 if err := w.write([]byte("alen")); err != nil { 720 return err 721 } 722 723 // Copy over the tmp posting offset table, however we need to 724 // adjust the offsets. 725 adjustment := w.postingsStart 726 727 w.buf1.Reset() 728 w.crc32.Reset() 729 w.buf1.PutBE32int(int(w.cntPO)) // Count. 730 w.buf1.WriteToHash(w.crc32) 731 if err := w.write(w.buf1.Get()); err != nil { 732 return err 733 } 734 735 f, err := fileutil.OpenMmapFile(w.fPO.name) 736 if err != nil { 737 return err 738 } 739 defer func() { 740 if f != nil { 741 f.Close() 742 } 743 }() 744 d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos)) 745 cnt := w.cntPO 746 for d.Err() == nil && cnt > 0 { 747 w.buf1.Reset() 748 w.buf1.PutUvarint(d.Uvarint()) // Keycount. 749 w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label name. 750 w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label value. 751 w.buf1.PutUvarint64(d.Uvarint64() + adjustment) // Offset. 752 w.buf1.WriteToHash(w.crc32) 753 if err := w.write(w.buf1.Get()); err != nil { 754 return err 755 } 756 cnt-- 757 } 758 if d.Err() != nil { 759 return d.Err() 760 } 761 762 // Cleanup temporary file. 763 if err := f.Close(); err != nil { 764 return err 765 } 766 f = nil 767 if err := w.fPO.Close(); err != nil { 768 return err 769 } 770 if err := w.fPO.Remove(); err != nil { 771 return err 772 } 773 w.fPO = nil 774 775 // Write out the length. 776 w.buf1.Reset() 777 w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) 778 if err := w.writeAt(w.buf1.Get(), startPos); err != nil { 779 return err 780 } 781 782 // Finally write the hash. 783 w.buf1.Reset() 784 w.buf1.PutHashSum(w.crc32) 785 return w.write(w.buf1.Get()) 786} 787 788const indexTOCLen = 6*8 + crc32.Size 789 790func (w *Writer) writeTOC() error { 791 w.buf1.Reset() 792 793 w.buf1.PutBE64(w.toc.Symbols) 794 w.buf1.PutBE64(w.toc.Series) 795 w.buf1.PutBE64(w.toc.LabelIndices) 796 w.buf1.PutBE64(w.toc.LabelIndicesTable) 797 w.buf1.PutBE64(w.toc.Postings) 798 w.buf1.PutBE64(w.toc.PostingsTable) 799 800 w.buf1.PutHash(w.crc32) 801 802 return w.write(w.buf1.Get()) 803} 804 805func (w *Writer) writePostingsToTmpFiles() error { 806 names := make([]string, 0, len(w.labelNames)) 807 for n := range w.labelNames { 808 names = append(names, n) 809 } 810 sort.Strings(names) 811 812 if err := w.f.Flush(); err != nil { 813 return err 814 } 815 f, err := fileutil.OpenMmapFile(w.f.name) 816 if err != nil { 817 return err 818 } 819 defer f.Close() 820 821 // Write out the special all posting. 822 offsets := []uint32{} 823 d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) 824 d.Skip(int(w.toc.Series)) 825 for d.Len() > 0 { 826 d.ConsumePadding() 827 startPos := w.toc.LabelIndices - uint64(d.Len()) 828 if startPos%16 != 0 { 829 return errors.Errorf("series not 16-byte aligned at %d", startPos) 830 } 831 offsets = append(offsets, uint32(startPos/16)) 832 // Skip to next series. 833 x := d.Uvarint() 834 d.Skip(x + crc32.Size) 835 if err := d.Err(); err != nil { 836 return err 837 } 838 } 839 if err := w.writePosting("", "", offsets); err != nil { 840 return err 841 } 842 maxPostings := uint64(len(offsets)) // No label name can have more postings than this. 843 844 for len(names) > 0 { 845 batchNames := []string{} 846 var c uint64 847 // Try to bunch up label names into one loop, but avoid 848 // using more memory than a single label name can. 849 for len(names) > 0 { 850 if w.labelNames[names[0]]+c > maxPostings { 851 break 852 } 853 batchNames = append(batchNames, names[0]) 854 c += w.labelNames[names[0]] 855 names = names[1:] 856 } 857 858 nameSymbols := map[uint32]string{} 859 for _, name := range batchNames { 860 sid, err := w.symbols.ReverseLookup(name) 861 if err != nil { 862 return err 863 } 864 nameSymbols[sid] = name 865 } 866 // Label name -> label value -> positions. 867 postings := map[uint32]map[uint32][]uint32{} 868 869 d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) 870 d.Skip(int(w.toc.Series)) 871 for d.Len() > 0 { 872 d.ConsumePadding() 873 startPos := w.toc.LabelIndices - uint64(d.Len()) 874 l := d.Uvarint() // Length of this series in bytes. 875 startLen := d.Len() 876 877 // See if label names we want are in the series. 878 numLabels := d.Uvarint() 879 for i := 0; i < numLabels; i++ { 880 lno := uint32(d.Uvarint()) 881 lvo := uint32(d.Uvarint()) 882 883 if _, ok := nameSymbols[lno]; ok { 884 if _, ok := postings[lno]; !ok { 885 postings[lno] = map[uint32][]uint32{} 886 } 887 postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16)) 888 } 889 } 890 // Skip to next series. 891 d.Skip(l - (startLen - d.Len()) + crc32.Size) 892 if err := d.Err(); err != nil { 893 return err 894 } 895 } 896 897 for _, name := range batchNames { 898 // Write out postings for this label name. 899 sid, err := w.symbols.ReverseLookup(name) 900 if err != nil { 901 return err 902 } 903 values := make([]uint32, 0, len(postings[sid])) 904 for v := range postings[sid] { 905 values = append(values, v) 906 907 } 908 // Symbol numbers are in order, so the strings will also be in order. 909 sort.Sort(uint32slice(values)) 910 for _, v := range values { 911 value, err := w.symbols.Lookup(v) 912 if err != nil { 913 return err 914 } 915 if err := w.writePosting(name, value, postings[sid][v]); err != nil { 916 return err 917 } 918 } 919 } 920 select { 921 case <-w.ctx.Done(): 922 return w.ctx.Err() 923 default: 924 } 925 926 } 927 return nil 928} 929 930func (w *Writer) writePosting(name, value string, offs []uint32) error { 931 // Align beginning to 4 bytes for more efficient postings list scans. 932 if err := w.fP.AddPadding(4); err != nil { 933 return err 934 } 935 936 // Write out postings offset table to temporary file as we go. 937 w.buf1.Reset() 938 w.buf1.PutUvarint(2) 939 w.buf1.PutUvarintStr(name) 940 w.buf1.PutUvarintStr(value) 941 w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file. 942 if err := w.fPO.Write(w.buf1.Get()); err != nil { 943 return err 944 } 945 w.cntPO++ 946 947 w.buf1.Reset() 948 w.buf1.PutBE32int(len(offs)) 949 950 for _, off := range offs { 951 if off > (1<<32)-1 { 952 return errors.Errorf("series offset %d exceeds 4 bytes", off) 953 } 954 w.buf1.PutBE32(off) 955 } 956 957 w.buf2.Reset() 958 w.buf2.PutBE32int(w.buf1.Len()) 959 w.buf1.PutHash(w.crc32) 960 return w.fP.Write(w.buf2.Get(), w.buf1.Get()) 961} 962 963func (w *Writer) writePostings() error { 964 // There's padding in the tmp file, make sure it actually works. 965 if err := w.f.AddPadding(4); err != nil { 966 return err 967 } 968 w.postingsStart = w.f.pos 969 970 // Copy temporary file into main index. 971 if err := w.fP.Flush(); err != nil { 972 return err 973 } 974 if _, err := w.fP.f.Seek(0, 0); err != nil { 975 return err 976 } 977 // Don't need to calculate a checksum, so can copy directly. 978 n, err := io.CopyBuffer(w.f.fbuf, w.fP.f, make([]byte, 1<<20)) 979 if err != nil { 980 return err 981 } 982 if uint64(n) != w.fP.pos { 983 return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.pos, n) 984 } 985 w.f.pos += uint64(n) 986 987 if err := w.fP.Close(); err != nil { 988 return err 989 } 990 if err := w.fP.Remove(); err != nil { 991 return err 992 } 993 w.fP = nil 994 return nil 995} 996 997type uint32slice []uint32 998 999func (s uint32slice) Len() int { return len(s) } 1000func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 1001func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } 1002 1003type labelIndexHashEntry struct { 1004 keys []string 1005 offset uint64 1006} 1007 1008func (w *Writer) Close() error { 1009 // Even if this fails, we need to close all the files. 1010 ensureErr := w.ensureStage(idxStageDone) 1011 1012 if w.symbolFile != nil { 1013 if err := w.symbolFile.Close(); err != nil { 1014 return err 1015 } 1016 } 1017 if w.fP != nil { 1018 if err := w.fP.Close(); err != nil { 1019 return err 1020 } 1021 } 1022 if w.fPO != nil { 1023 if err := w.fPO.Close(); err != nil { 1024 return err 1025 } 1026 } 1027 if err := w.f.Close(); err != nil { 1028 return err 1029 } 1030 return ensureErr 1031} 1032 1033// StringIter iterates over a sorted list of strings. 1034type StringIter interface { 1035 // Next advances the iterator and returns true if another value was found. 1036 Next() bool 1037 1038 // At returns the value at the current iterator position. 1039 At() string 1040 1041 // Err returns the last error of the iterator. 1042 Err() error 1043} 1044 1045type Reader struct { 1046 b ByteSlice 1047 toc *TOC 1048 1049 // Close that releases the underlying resources of the byte slice. 1050 c io.Closer 1051 1052 // Map of LabelName to a list of some LabelValues's position in the offset table. 1053 // The first and last values for each name are always present. 1054 postings map[string][]postingOffset 1055 // For the v1 format, labelname -> labelvalue -> offset. 1056 postingsV1 map[string]map[string]uint64 1057 1058 symbols *Symbols 1059 nameSymbols map[uint32]string // Cache of the label name symbol lookups, 1060 // as there are not many and they are half of all lookups. 1061 1062 dec *Decoder 1063 1064 version int 1065} 1066 1067type postingOffset struct { 1068 value string 1069 off int 1070} 1071 1072// ByteSlice abstracts a byte slice. 1073type ByteSlice interface { 1074 Len() int 1075 Range(start, end int) []byte 1076} 1077 1078type realByteSlice []byte 1079 1080func (b realByteSlice) Len() int { 1081 return len(b) 1082} 1083 1084func (b realByteSlice) Range(start, end int) []byte { 1085 return b[start:end] 1086} 1087 1088func (b realByteSlice) Sub(start, end int) ByteSlice { 1089 return b[start:end] 1090} 1091 1092// NewReader returns a new index reader on the given byte slice. It automatically 1093// handles different format versions. 1094func NewReader(b ByteSlice) (*Reader, error) { 1095 return newReader(b, ioutil.NopCloser(nil)) 1096} 1097 1098// NewFileReader returns a new index reader against the given index file. 1099func NewFileReader(path string) (*Reader, error) { 1100 f, err := fileutil.OpenMmapFile(path) 1101 if err != nil { 1102 return nil, err 1103 } 1104 r, err := newReader(realByteSlice(f.Bytes()), f) 1105 if err != nil { 1106 return nil, tsdb_errors.NewMulti( 1107 err, 1108 f.Close(), 1109 ).Err() 1110 } 1111 1112 return r, nil 1113} 1114 1115func newReader(b ByteSlice, c io.Closer) (*Reader, error) { 1116 r := &Reader{ 1117 b: b, 1118 c: c, 1119 postings: map[string][]postingOffset{}, 1120 } 1121 1122 // Verify header. 1123 if r.b.Len() < HeaderLen { 1124 return nil, errors.Wrap(encoding.ErrInvalidSize, "index header") 1125 } 1126 if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { 1127 return nil, errors.Errorf("invalid magic number %x", m) 1128 } 1129 r.version = int(r.b.Range(4, 5)[0]) 1130 1131 if r.version != FormatV1 && r.version != FormatV2 { 1132 return nil, errors.Errorf("unknown index file version %d", r.version) 1133 } 1134 1135 var err error 1136 r.toc, err = NewTOCFromByteSlice(b) 1137 if err != nil { 1138 return nil, errors.Wrap(err, "read TOC") 1139 } 1140 1141 r.symbols, err = NewSymbols(r.b, r.version, int(r.toc.Symbols)) 1142 if err != nil { 1143 return nil, errors.Wrap(err, "read symbols") 1144 } 1145 1146 if r.version == FormatV1 { 1147 // Earlier V1 formats don't have a sorted postings offset table, so 1148 // load the whole offset table into memory. 1149 r.postingsV1 = map[string]map[string]uint64{} 1150 if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error { 1151 if len(key) != 2 { 1152 return errors.Errorf("unexpected key length for posting table %d", len(key)) 1153 } 1154 if _, ok := r.postingsV1[key[0]]; !ok { 1155 r.postingsV1[key[0]] = map[string]uint64{} 1156 r.postings[key[0]] = nil // Used to get a list of labelnames in places. 1157 } 1158 r.postingsV1[key[0]][key[1]] = off 1159 return nil 1160 }); err != nil { 1161 return nil, errors.Wrap(err, "read postings table") 1162 } 1163 } else { 1164 var lastKey []string 1165 lastOff := 0 1166 valueCount := 0 1167 // For the postings offset table we keep every label name but only every nth 1168 // label value (plus the first and last one), to save memory. 1169 if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error { 1170 if len(key) != 2 { 1171 return errors.Errorf("unexpected key length for posting table %d", len(key)) 1172 } 1173 if _, ok := r.postings[key[0]]; !ok { 1174 // Next label name. 1175 r.postings[key[0]] = []postingOffset{} 1176 if lastKey != nil { 1177 // Always include last value for each label name. 1178 r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff}) 1179 } 1180 lastKey = nil 1181 valueCount = 0 1182 } 1183 if valueCount%symbolFactor == 0 { 1184 r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], off: off}) 1185 lastKey = nil 1186 } else { 1187 lastKey = key 1188 lastOff = off 1189 } 1190 valueCount++ 1191 return nil 1192 }); err != nil { 1193 return nil, errors.Wrap(err, "read postings table") 1194 } 1195 if lastKey != nil { 1196 r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff}) 1197 } 1198 // Trim any extra space in the slices. 1199 for k, v := range r.postings { 1200 l := make([]postingOffset, len(v)) 1201 copy(l, v) 1202 r.postings[k] = l 1203 } 1204 } 1205 1206 r.nameSymbols = make(map[uint32]string, len(r.postings)) 1207 for k := range r.postings { 1208 if k == "" { 1209 continue 1210 } 1211 off, err := r.symbols.ReverseLookup(k) 1212 if err != nil { 1213 return nil, errors.Wrap(err, "reverse symbol lookup") 1214 } 1215 r.nameSymbols[off] = k 1216 } 1217 1218 r.dec = &Decoder{LookupSymbol: r.lookupSymbol} 1219 1220 return r, nil 1221} 1222 1223// Version returns the file format version of the underlying index. 1224func (r *Reader) Version() int { 1225 return r.version 1226} 1227 1228// Range marks a byte range. 1229type Range struct { 1230 Start, End int64 1231} 1232 1233// PostingsRanges returns a new map of byte range in the underlying index file 1234// for all postings lists. 1235func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { 1236 m := map[labels.Label]Range{} 1237 if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error { 1238 if len(key) != 2 { 1239 return errors.Errorf("unexpected key length for posting table %d", len(key)) 1240 } 1241 d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable) 1242 if d.Err() != nil { 1243 return d.Err() 1244 } 1245 m[labels.Label{Name: key[0], Value: key[1]}] = Range{ 1246 Start: int64(off) + 4, 1247 End: int64(off) + 4 + int64(d.Len()), 1248 } 1249 return nil 1250 }); err != nil { 1251 return nil, errors.Wrap(err, "read postings table") 1252 } 1253 return m, nil 1254} 1255 1256type Symbols struct { 1257 bs ByteSlice 1258 version int 1259 off int 1260 1261 offsets []int 1262 seen int 1263} 1264 1265const symbolFactor = 32 1266 1267// NewSymbols returns a Symbols object for symbol lookups. 1268func NewSymbols(bs ByteSlice, version int, off int) (*Symbols, error) { 1269 s := &Symbols{ 1270 bs: bs, 1271 version: version, 1272 off: off, 1273 } 1274 d := encoding.NewDecbufAt(bs, off, castagnoliTable) 1275 var ( 1276 origLen = d.Len() 1277 cnt = d.Be32int() 1278 basePos = off + 4 1279 ) 1280 s.offsets = make([]int, 0, 1+cnt/symbolFactor) 1281 for d.Err() == nil && s.seen < cnt { 1282 if s.seen%symbolFactor == 0 { 1283 s.offsets = append(s.offsets, basePos+origLen-d.Len()) 1284 } 1285 d.UvarintBytes() // The symbol. 1286 s.seen++ 1287 } 1288 if d.Err() != nil { 1289 return nil, d.Err() 1290 } 1291 return s, nil 1292} 1293 1294func (s Symbols) Lookup(o uint32) (string, error) { 1295 d := encoding.Decbuf{ 1296 B: s.bs.Range(0, s.bs.Len()), 1297 } 1298 1299 if s.version == FormatV2 { 1300 if int(o) >= s.seen { 1301 return "", errors.Errorf("unknown symbol offset %d", o) 1302 } 1303 d.Skip(s.offsets[int(o/symbolFactor)]) 1304 // Walk until we find the one we want. 1305 for i := o - (o / symbolFactor * symbolFactor); i > 0; i-- { 1306 d.UvarintBytes() 1307 } 1308 } else { 1309 d.Skip(int(o)) 1310 } 1311 sym := d.UvarintStr() 1312 if d.Err() != nil { 1313 return "", d.Err() 1314 } 1315 return sym, nil 1316} 1317 1318func (s Symbols) ReverseLookup(sym string) (uint32, error) { 1319 if len(s.offsets) == 0 { 1320 return 0, errors.Errorf("unknown symbol %q - no symbols", sym) 1321 } 1322 i := sort.Search(len(s.offsets), func(i int) bool { 1323 // Any decoding errors here will be lost, however 1324 // we already read through all of this at startup. 1325 d := encoding.Decbuf{ 1326 B: s.bs.Range(0, s.bs.Len()), 1327 } 1328 d.Skip(s.offsets[i]) 1329 return yoloString(d.UvarintBytes()) > sym 1330 }) 1331 d := encoding.Decbuf{ 1332 B: s.bs.Range(0, s.bs.Len()), 1333 } 1334 if i > 0 { 1335 i-- 1336 } 1337 d.Skip(s.offsets[i]) 1338 res := i * symbolFactor 1339 var lastLen int 1340 var lastSymbol string 1341 for d.Err() == nil && res <= s.seen { 1342 lastLen = d.Len() 1343 lastSymbol = yoloString(d.UvarintBytes()) 1344 if lastSymbol >= sym { 1345 break 1346 } 1347 res++ 1348 } 1349 if d.Err() != nil { 1350 return 0, d.Err() 1351 } 1352 if lastSymbol != sym { 1353 return 0, errors.Errorf("unknown symbol %q", sym) 1354 } 1355 if s.version == FormatV2 { 1356 return uint32(res), nil 1357 } 1358 return uint32(s.bs.Len() - lastLen), nil 1359} 1360 1361func (s Symbols) Size() int { 1362 return len(s.offsets) * 8 1363} 1364 1365func (s Symbols) Iter() StringIter { 1366 d := encoding.NewDecbufAt(s.bs, s.off, castagnoliTable) 1367 cnt := d.Be32int() 1368 return &symbolsIter{ 1369 d: d, 1370 cnt: cnt, 1371 } 1372} 1373 1374// symbolsIter implements StringIter. 1375type symbolsIter struct { 1376 d encoding.Decbuf 1377 cnt int 1378 cur string 1379 err error 1380} 1381 1382func (s *symbolsIter) Next() bool { 1383 if s.cnt == 0 || s.err != nil { 1384 return false 1385 } 1386 s.cur = yoloString(s.d.UvarintBytes()) 1387 s.cnt-- 1388 if s.d.Err() != nil { 1389 s.err = s.d.Err() 1390 return false 1391 } 1392 return true 1393} 1394 1395func (s symbolsIter) At() string { return s.cur } 1396func (s symbolsIter) Err() error { return s.err } 1397 1398// ReadOffsetTable reads an offset table and at the given position calls f for each 1399// found entry. If f returns an error it stops decoding and returns the received error. 1400func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64, int) error) error { 1401 d := encoding.NewDecbufAt(bs, int(off), castagnoliTable) 1402 startLen := d.Len() 1403 cnt := d.Be32() 1404 1405 for d.Err() == nil && d.Len() > 0 && cnt > 0 { 1406 offsetPos := startLen - d.Len() 1407 keyCount := d.Uvarint() 1408 // The Postings offset table takes only 2 keys per entry (name and value of label), 1409 // and the LabelIndices offset table takes only 1 key per entry (a label name). 1410 // Hence setting the size to max of both, i.e. 2. 1411 keys := make([]string, 0, 2) 1412 1413 for i := 0; i < keyCount; i++ { 1414 keys = append(keys, d.UvarintStr()) 1415 } 1416 o := d.Uvarint64() 1417 if d.Err() != nil { 1418 break 1419 } 1420 if err := f(keys, o, offsetPos); err != nil { 1421 return err 1422 } 1423 cnt-- 1424 } 1425 return d.Err() 1426} 1427 1428// Close the reader and its underlying resources. 1429func (r *Reader) Close() error { 1430 return r.c.Close() 1431} 1432 1433func (r *Reader) lookupSymbol(o uint32) (string, error) { 1434 if s, ok := r.nameSymbols[o]; ok { 1435 return s, nil 1436 } 1437 return r.symbols.Lookup(o) 1438} 1439 1440// Symbols returns an iterator over the symbols that exist within the index. 1441func (r *Reader) Symbols() StringIter { 1442 return r.symbols.Iter() 1443} 1444 1445// SymbolTableSize returns the symbol table size in bytes. 1446func (r *Reader) SymbolTableSize() uint64 { 1447 return uint64(r.symbols.Size()) 1448} 1449 1450// SortedLabelValues returns value tuples that exist for the given label name. 1451// It is not safe to use the return value beyond the lifetime of the byte slice 1452// passed into the Reader. 1453func (r *Reader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { 1454 values, err := r.LabelValues(name, matchers...) 1455 if err == nil && r.version == FormatV1 { 1456 sort.Strings(values) 1457 } 1458 return values, err 1459} 1460 1461// LabelValues returns value tuples that exist for the given label name. 1462// It is not safe to use the return value beyond the lifetime of the byte slice 1463// passed into the Reader. 1464// TODO(replay): Support filtering by matchers 1465func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { 1466 if len(matchers) > 0 { 1467 return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers) 1468 } 1469 1470 if r.version == FormatV1 { 1471 e, ok := r.postingsV1[name] 1472 if !ok { 1473 return nil, nil 1474 } 1475 values := make([]string, 0, len(e)) 1476 for k := range e { 1477 values = append(values, k) 1478 } 1479 return values, nil 1480 1481 } 1482 e, ok := r.postings[name] 1483 if !ok { 1484 return nil, nil 1485 } 1486 if len(e) == 0 { 1487 return nil, nil 1488 } 1489 values := make([]string, 0, len(e)*symbolFactor) 1490 1491 d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) 1492 d.Skip(e[0].off) 1493 lastVal := e[len(e)-1].value 1494 1495 skip := 0 1496 for d.Err() == nil { 1497 if skip == 0 { 1498 // These are always the same number of bytes, 1499 // and it's faster to skip than parse. 1500 skip = d.Len() 1501 d.Uvarint() // Keycount. 1502 d.UvarintBytes() // Label name. 1503 skip -= d.Len() 1504 } else { 1505 d.Skip(skip) 1506 } 1507 s := yoloString(d.UvarintBytes()) //Label value. 1508 values = append(values, s) 1509 if s == lastVal { 1510 break 1511 } 1512 d.Uvarint64() // Offset. 1513 } 1514 if d.Err() != nil { 1515 return nil, errors.Wrap(d.Err(), "get postings offset entry") 1516 } 1517 return values, nil 1518} 1519 1520// LabelNamesFor returns all the label names for the series referred to by IDs. 1521// The names returned are sorted. 1522func (r *Reader) LabelNamesFor(ids ...uint64) ([]string, error) { 1523 // Gather offsetsMap the name offsetsMap in the symbol table first 1524 offsetsMap := make(map[uint32]struct{}) 1525 for _, id := range ids { 1526 offset := id 1527 // In version 2 series IDs are no longer exact references but series are 16-byte padded 1528 // and the ID is the multiple of 16 of the actual position. 1529 if r.version == FormatV2 { 1530 offset = id * 16 1531 } 1532 1533 d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable) 1534 buf := d.Get() 1535 if d.Err() != nil { 1536 return nil, errors.Wrap(d.Err(), "get buffer for series") 1537 } 1538 1539 offsets, err := r.dec.LabelNamesOffsetsFor(buf) 1540 if err != nil { 1541 return nil, errors.Wrap(err, "get label name offsets") 1542 } 1543 for _, off := range offsets { 1544 offsetsMap[off] = struct{}{} 1545 } 1546 } 1547 1548 // Lookup the unique symbols. 1549 names := make([]string, 0, len(offsetsMap)) 1550 for off := range offsetsMap { 1551 name, err := r.lookupSymbol(off) 1552 if err != nil { 1553 return nil, errors.Wrap(err, "lookup symbol in LabelNamesFor") 1554 } 1555 names = append(names, name) 1556 } 1557 1558 sort.Strings(names) 1559 1560 return names, nil 1561} 1562 1563// LabelValueFor returns label value for the given label name in the series referred to by ID. 1564func (r *Reader) LabelValueFor(id uint64, label string) (string, error) { 1565 offset := id 1566 // In version 2 series IDs are no longer exact references but series are 16-byte padded 1567 // and the ID is the multiple of 16 of the actual position. 1568 if r.version == FormatV2 { 1569 offset = id * 16 1570 } 1571 d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable) 1572 buf := d.Get() 1573 if d.Err() != nil { 1574 return "", errors.Wrap(d.Err(), "label values for") 1575 } 1576 1577 value, err := r.dec.LabelValueFor(buf, label) 1578 if err != nil { 1579 return "", storage.ErrNotFound 1580 } 1581 1582 if value == "" { 1583 return "", storage.ErrNotFound 1584 } 1585 1586 return value, nil 1587} 1588 1589// Series reads the series with the given ID and writes its labels and chunks into lbls and chks. 1590func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { 1591 offset := id 1592 // In version 2 series IDs are no longer exact references but series are 16-byte padded 1593 // and the ID is the multiple of 16 of the actual position. 1594 if r.version == FormatV2 { 1595 offset = id * 16 1596 } 1597 d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable) 1598 if d.Err() != nil { 1599 return d.Err() 1600 } 1601 return errors.Wrap(r.dec.Series(d.Get(), lbls, chks), "read series") 1602} 1603 1604func (r *Reader) Postings(name string, values ...string) (Postings, error) { 1605 if r.version == FormatV1 { 1606 e, ok := r.postingsV1[name] 1607 if !ok { 1608 return EmptyPostings(), nil 1609 } 1610 res := make([]Postings, 0, len(values)) 1611 for _, v := range values { 1612 postingsOff, ok := e[v] 1613 if !ok { 1614 continue 1615 } 1616 // Read from the postings table. 1617 d := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) 1618 _, p, err := r.dec.Postings(d.Get()) 1619 if err != nil { 1620 return nil, errors.Wrap(err, "decode postings") 1621 } 1622 res = append(res, p) 1623 } 1624 return Merge(res...), nil 1625 } 1626 1627 e, ok := r.postings[name] 1628 if !ok { 1629 return EmptyPostings(), nil 1630 } 1631 1632 if len(values) == 0 { 1633 return EmptyPostings(), nil 1634 } 1635 1636 res := make([]Postings, 0, len(values)) 1637 skip := 0 1638 valueIndex := 0 1639 for valueIndex < len(values) && values[valueIndex] < e[0].value { 1640 // Discard values before the start. 1641 valueIndex++ 1642 } 1643 for valueIndex < len(values) { 1644 value := values[valueIndex] 1645 1646 i := sort.Search(len(e), func(i int) bool { return e[i].value >= value }) 1647 if i == len(e) { 1648 // We're past the end. 1649 break 1650 } 1651 if i > 0 && e[i].value != value { 1652 // Need to look from previous entry. 1653 i-- 1654 } 1655 // Don't Crc32 the entire postings offset table, this is very slow 1656 // so hope any issues were caught at startup. 1657 d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil) 1658 d.Skip(e[i].off) 1659 1660 // Iterate on the offset table. 1661 var postingsOff uint64 // The offset into the postings table. 1662 for d.Err() == nil { 1663 if skip == 0 { 1664 // These are always the same number of bytes, 1665 // and it's faster to skip than parse. 1666 skip = d.Len() 1667 d.Uvarint() // Keycount. 1668 d.UvarintBytes() // Label name. 1669 skip -= d.Len() 1670 } else { 1671 d.Skip(skip) 1672 } 1673 v := d.UvarintBytes() // Label value. 1674 postingsOff = d.Uvarint64() // Offset. 1675 for string(v) >= value { 1676 if string(v) == value { 1677 // Read from the postings table. 1678 d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) 1679 _, p, err := r.dec.Postings(d2.Get()) 1680 if err != nil { 1681 return nil, errors.Wrap(err, "decode postings") 1682 } 1683 res = append(res, p) 1684 } 1685 valueIndex++ 1686 if valueIndex == len(values) { 1687 break 1688 } 1689 value = values[valueIndex] 1690 } 1691 if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) { 1692 // Need to go to a later postings offset entry, if there is one. 1693 break 1694 } 1695 } 1696 if d.Err() != nil { 1697 return nil, errors.Wrap(d.Err(), "get postings offset entry") 1698 } 1699 } 1700 1701 return Merge(res...), nil 1702} 1703 1704// SortedPostings returns the given postings list reordered so that the backing series 1705// are sorted. 1706func (r *Reader) SortedPostings(p Postings) Postings { 1707 return p 1708} 1709 1710// Size returns the size of an index file. 1711func (r *Reader) Size() int64 { 1712 return int64(r.b.Len()) 1713} 1714 1715// LabelNames returns all the unique label names present in the index. 1716// TODO(twilkie) implement support for matchers 1717func (r *Reader) LabelNames(matchers ...*labels.Matcher) ([]string, error) { 1718 if len(matchers) > 0 { 1719 return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers) 1720 } 1721 1722 labelNames := make([]string, 0, len(r.postings)) 1723 for name := range r.postings { 1724 if name == allPostingsKey.Name { 1725 // This is not from any metric. 1726 continue 1727 } 1728 labelNames = append(labelNames, name) 1729 } 1730 sort.Strings(labelNames) 1731 return labelNames, nil 1732} 1733 1734// NewStringListIter returns a StringIter for the given sorted list of strings. 1735func NewStringListIter(s []string) StringIter { 1736 return &stringListIter{l: s} 1737} 1738 1739// symbolsIter implements StringIter. 1740type stringListIter struct { 1741 l []string 1742 cur string 1743} 1744 1745func (s *stringListIter) Next() bool { 1746 if len(s.l) == 0 { 1747 return false 1748 } 1749 s.cur = s.l[0] 1750 s.l = s.l[1:] 1751 return true 1752} 1753func (s stringListIter) At() string { return s.cur } 1754func (s stringListIter) Err() error { return nil } 1755 1756// Decoder provides decoding methods for the v1 and v2 index file format. 1757// 1758// It currently does not contain decoding methods for all entry types but can be extended 1759// by them if there's demand. 1760type Decoder struct { 1761 LookupSymbol func(uint32) (string, error) 1762} 1763 1764// Postings returns a postings list for b and its number of elements. 1765func (dec *Decoder) Postings(b []byte) (int, Postings, error) { 1766 d := encoding.Decbuf{B: b} 1767 n := d.Be32int() 1768 l := d.Get() 1769 return n, newBigEndianPostings(l), d.Err() 1770} 1771 1772// LabelNamesOffsetsFor decodes the offsets of the name symbols for a given series. 1773// They are returned in the same order they're stored, which should be sorted lexicographically. 1774func (dec *Decoder) LabelNamesOffsetsFor(b []byte) ([]uint32, error) { 1775 d := encoding.Decbuf{B: b} 1776 k := d.Uvarint() 1777 1778 offsets := make([]uint32, k) 1779 for i := 0; i < k; i++ { 1780 offsets[i] = uint32(d.Uvarint()) 1781 _ = d.Uvarint() // skip the label value 1782 1783 if d.Err() != nil { 1784 return nil, errors.Wrap(d.Err(), "read series label offsets") 1785 } 1786 } 1787 1788 return offsets, d.Err() 1789} 1790 1791// LabelValueFor decodes a label for a given series. 1792func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) { 1793 d := encoding.Decbuf{B: b} 1794 k := d.Uvarint() 1795 1796 for i := 0; i < k; i++ { 1797 lno := uint32(d.Uvarint()) 1798 lvo := uint32(d.Uvarint()) 1799 1800 if d.Err() != nil { 1801 return "", errors.Wrap(d.Err(), "read series label offsets") 1802 } 1803 1804 ln, err := dec.LookupSymbol(lno) 1805 if err != nil { 1806 return "", errors.Wrap(err, "lookup label name") 1807 } 1808 1809 if ln == label { 1810 lv, err := dec.LookupSymbol(lvo) 1811 if err != nil { 1812 return "", errors.Wrap(err, "lookup label value") 1813 } 1814 1815 return lv, nil 1816 } 1817 } 1818 1819 return "", d.Err() 1820} 1821 1822// Series decodes a series entry from the given byte slice into lset and chks. 1823func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { 1824 *lbls = (*lbls)[:0] 1825 *chks = (*chks)[:0] 1826 1827 d := encoding.Decbuf{B: b} 1828 1829 k := d.Uvarint() 1830 1831 for i := 0; i < k; i++ { 1832 lno := uint32(d.Uvarint()) 1833 lvo := uint32(d.Uvarint()) 1834 1835 if d.Err() != nil { 1836 return errors.Wrap(d.Err(), "read series label offsets") 1837 } 1838 1839 ln, err := dec.LookupSymbol(lno) 1840 if err != nil { 1841 return errors.Wrap(err, "lookup label name") 1842 } 1843 lv, err := dec.LookupSymbol(lvo) 1844 if err != nil { 1845 return errors.Wrap(err, "lookup label value") 1846 } 1847 1848 *lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) 1849 } 1850 1851 // Read the chunks meta data. 1852 k = d.Uvarint() 1853 1854 if k == 0 { 1855 return d.Err() 1856 } 1857 1858 t0 := d.Varint64() 1859 maxt := int64(d.Uvarint64()) + t0 1860 ref0 := int64(d.Uvarint64()) 1861 1862 *chks = append(*chks, chunks.Meta{ 1863 Ref: uint64(ref0), 1864 MinTime: t0, 1865 MaxTime: maxt, 1866 }) 1867 t0 = maxt 1868 1869 for i := 1; i < k; i++ { 1870 mint := int64(d.Uvarint64()) + t0 1871 maxt := int64(d.Uvarint64()) + mint 1872 1873 ref0 += d.Varint64() 1874 t0 = maxt 1875 1876 if d.Err() != nil { 1877 return errors.Wrapf(d.Err(), "read meta for chunk %d", i) 1878 } 1879 1880 *chks = append(*chks, chunks.Meta{ 1881 Ref: uint64(ref0), 1882 MinTime: mint, 1883 MaxTime: maxt, 1884 }) 1885 } 1886 return d.Err() 1887} 1888 1889func yoloString(b []byte) string { 1890 return *((*string)(unsafe.Pointer(&b))) 1891} 1892