1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package index
15
16import (
17	"bufio"
18	"bytes"
19	"context"
20	"encoding/binary"
21	"hash"
22	"hash/crc32"
23	"io"
24	"io/ioutil"
25	"math"
26	"os"
27	"path/filepath"
28	"sort"
29	"unsafe"
30
31	"github.com/pkg/errors"
32
33	"github.com/prometheus/prometheus/pkg/labels"
34	"github.com/prometheus/prometheus/storage"
35	"github.com/prometheus/prometheus/tsdb/chunks"
36	"github.com/prometheus/prometheus/tsdb/encoding"
37	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
38	"github.com/prometheus/prometheus/tsdb/fileutil"
39)
40
41const (
42	// MagicIndex 4 bytes at the head of an index file.
43	MagicIndex = 0xBAAAD700
44	// HeaderLen represents number of bytes reserved of index for header.
45	HeaderLen = 5
46
47	// FormatV1 represents 1 version of index.
48	FormatV1 = 1
49	// FormatV2 represents 2 version of index.
50	FormatV2 = 2
51
52	indexFilename = "index"
53)
54
55type indexWriterSeries struct {
56	labels labels.Labels
57	chunks []chunks.Meta // series file offset of chunks
58}
59
60type indexWriterSeriesSlice []*indexWriterSeries
61
62func (s indexWriterSeriesSlice) Len() int      { return len(s) }
63func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
64
65func (s indexWriterSeriesSlice) Less(i, j int) bool {
66	return labels.Compare(s[i].labels, s[j].labels) < 0
67}
68
69type indexWriterStage uint8
70
71const (
72	idxStageNone indexWriterStage = iota
73	idxStageSymbols
74	idxStageSeries
75	idxStageDone
76)
77
78func (s indexWriterStage) String() string {
79	switch s {
80	case idxStageNone:
81		return "none"
82	case idxStageSymbols:
83		return "symbols"
84	case idxStageSeries:
85		return "series"
86	case idxStageDone:
87		return "done"
88	}
89	return "<unknown>"
90}
91
92// The table gets initialized with sync.Once but may still cause a race
93// with any other use of the crc32 package anywhere. Thus we initialize it
94// before.
95var castagnoliTable *crc32.Table
96
97func init() {
98	castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
99}
100
101// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
102// polynomial may be easily changed in one location at a later time, if necessary.
103func newCRC32() hash.Hash32 {
104	return crc32.New(castagnoliTable)
105}
106
107type symbolCacheEntry struct {
108	index          uint32
109	lastValue      string
110	lastValueIndex uint32
111}
112
113// Writer implements the IndexWriter interface for the standard
114// serialization format.
115type Writer struct {
116	ctx context.Context
117
118	// For the main index file.
119	f *FileWriter
120
121	// Temporary file for postings.
122	fP *FileWriter
123	// Temporary file for posting offsets table.
124	fPO   *FileWriter
125	cntPO uint64
126
127	toc           TOC
128	stage         indexWriterStage
129	postingsStart uint64 // Due to padding, can differ from TOC entry.
130
131	// Reusable memory.
132	buf1 encoding.Encbuf
133	buf2 encoding.Encbuf
134
135	numSymbols  int
136	symbols     *Symbols
137	symbolFile  *fileutil.MmapFile
138	lastSymbol  string
139	symbolCache map[string]symbolCacheEntry
140
141	labelIndexes []labelIndexHashEntry // Label index offsets.
142	labelNames   map[string]uint64     // Label names, and their usage.
143
144	// Hold last series to validate that clients insert new series in order.
145	lastSeries labels.Labels
146	lastRef    uint64
147
148	crc32 hash.Hash
149
150	Version int
151}
152
153// TOC represents index Table Of Content that states where each section of index starts.
154type TOC struct {
155	Symbols           uint64
156	Series            uint64
157	LabelIndices      uint64
158	LabelIndicesTable uint64
159	Postings          uint64
160	PostingsTable     uint64
161}
162
163// NewTOCFromByteSlice return parsed TOC from given index byte slice.
164func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
165	if bs.Len() < indexTOCLen {
166		return nil, encoding.ErrInvalidSize
167	}
168	b := bs.Range(bs.Len()-indexTOCLen, bs.Len())
169
170	expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
171	d := encoding.Decbuf{B: b[:len(b)-4]}
172
173	if d.Crc32(castagnoliTable) != expCRC {
174		return nil, errors.Wrap(encoding.ErrInvalidChecksum, "read TOC")
175	}
176
177	if err := d.Err(); err != nil {
178		return nil, err
179	}
180
181	return &TOC{
182		Symbols:           d.Be64(),
183		Series:            d.Be64(),
184		LabelIndices:      d.Be64(),
185		LabelIndicesTable: d.Be64(),
186		Postings:          d.Be64(),
187		PostingsTable:     d.Be64(),
188	}, nil
189}
190
191// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
192func NewWriter(ctx context.Context, fn string) (*Writer, error) {
193	dir := filepath.Dir(fn)
194
195	df, err := fileutil.OpenDir(dir)
196	if err != nil {
197		return nil, err
198	}
199	defer df.Close() // Close for platform windows.
200
201	if err := os.RemoveAll(fn); err != nil {
202		return nil, errors.Wrap(err, "remove any existing index at path")
203	}
204
205	// Main index file we are building.
206	f, err := NewFileWriter(fn)
207	if err != nil {
208		return nil, err
209	}
210	// Temporary file for postings.
211	fP, err := NewFileWriter(fn + "_tmp_p")
212	if err != nil {
213		return nil, err
214	}
215	// Temporary file for posting offset table.
216	fPO, err := NewFileWriter(fn + "_tmp_po")
217	if err != nil {
218		return nil, err
219	}
220	if err := df.Sync(); err != nil {
221		return nil, errors.Wrap(err, "sync dir")
222	}
223
224	iw := &Writer{
225		ctx:   ctx,
226		f:     f,
227		fP:    fP,
228		fPO:   fPO,
229		stage: idxStageNone,
230
231		// Reusable memory.
232		buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
233		buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
234
235		symbolCache: make(map[string]symbolCacheEntry, 1<<8),
236		labelNames:  make(map[string]uint64, 1<<8),
237		crc32:       newCRC32(),
238	}
239	if err := iw.writeMeta(); err != nil {
240		return nil, err
241	}
242	return iw, nil
243}
244
245func (w *Writer) write(bufs ...[]byte) error {
246	return w.f.Write(bufs...)
247}
248
249func (w *Writer) writeAt(buf []byte, pos uint64) error {
250	return w.f.WriteAt(buf, pos)
251}
252
253func (w *Writer) addPadding(size int) error {
254	return w.f.AddPadding(size)
255}
256
257type FileWriter struct {
258	f    *os.File
259	fbuf *bufio.Writer
260	pos  uint64
261	name string
262}
263
264func NewFileWriter(name string) (*FileWriter, error) {
265	f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666)
266	if err != nil {
267		return nil, err
268	}
269	return &FileWriter{
270		f:    f,
271		fbuf: bufio.NewWriterSize(f, 1<<22),
272		pos:  0,
273		name: name,
274	}, nil
275}
276
277func (fw *FileWriter) Pos() uint64 {
278	return fw.pos
279}
280
281func (fw *FileWriter) Write(bufs ...[]byte) error {
282	for _, b := range bufs {
283		n, err := fw.fbuf.Write(b)
284		fw.pos += uint64(n)
285		if err != nil {
286			return err
287		}
288		// For now the index file must not grow beyond 64GiB. Some of the fixed-sized
289		// offset references in v1 are only 4 bytes large.
290		// Once we move to compressed/varint representations in those areas, this limitation
291		// can be lifted.
292		if fw.pos > 16*math.MaxUint32 {
293			return errors.Errorf("%q exceeding max size of 64GiB", fw.name)
294		}
295	}
296	return nil
297}
298
299func (fw *FileWriter) Flush() error {
300	return fw.fbuf.Flush()
301}
302
303func (fw *FileWriter) WriteAt(buf []byte, pos uint64) error {
304	if err := fw.Flush(); err != nil {
305		return err
306	}
307	_, err := fw.f.WriteAt(buf, int64(pos))
308	return err
309}
310
311// AddPadding adds zero byte padding until the file size is a multiple size.
312func (fw *FileWriter) AddPadding(size int) error {
313	p := fw.pos % uint64(size)
314	if p == 0 {
315		return nil
316	}
317	p = uint64(size) - p
318
319	if err := fw.Write(make([]byte, p)); err != nil {
320		return errors.Wrap(err, "add padding")
321	}
322	return nil
323}
324
325func (fw *FileWriter) Close() error {
326	if err := fw.Flush(); err != nil {
327		return err
328	}
329	if err := fw.f.Sync(); err != nil {
330		return err
331	}
332	return fw.f.Close()
333}
334
335func (fw *FileWriter) Remove() error {
336	return os.Remove(fw.name)
337}
338
339// ensureStage handles transitions between write stages and ensures that IndexWriter
340// methods are called in an order valid for the implementation.
341func (w *Writer) ensureStage(s indexWriterStage) error {
342	select {
343	case <-w.ctx.Done():
344		return w.ctx.Err()
345	default:
346	}
347
348	if w.stage == s {
349		return nil
350	}
351	if w.stage < s-1 {
352		// A stage has been skipped.
353		if err := w.ensureStage(s - 1); err != nil {
354			return err
355		}
356	}
357	if w.stage > s {
358		return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
359	}
360
361	// Mark start of sections in table of contents.
362	switch s {
363	case idxStageSymbols:
364		w.toc.Symbols = w.f.pos
365		if err := w.startSymbols(); err != nil {
366			return err
367		}
368	case idxStageSeries:
369		if err := w.finishSymbols(); err != nil {
370			return err
371		}
372		w.toc.Series = w.f.pos
373
374	case idxStageDone:
375		w.toc.LabelIndices = w.f.pos
376		// LabelIndices generation depends on the posting offset
377		// table produced at this stage.
378		if err := w.writePostingsToTmpFiles(); err != nil {
379			return err
380		}
381		if err := w.writeLabelIndices(); err != nil {
382			return err
383		}
384
385		w.toc.Postings = w.f.pos
386		if err := w.writePostings(); err != nil {
387			return err
388		}
389
390		w.toc.LabelIndicesTable = w.f.pos
391		if err := w.writeLabelIndexesOffsetTable(); err != nil {
392			return err
393		}
394
395		w.toc.PostingsTable = w.f.pos
396		if err := w.writePostingsOffsetTable(); err != nil {
397			return err
398		}
399		if err := w.writeTOC(); err != nil {
400			return err
401		}
402	}
403
404	w.stage = s
405	return nil
406}
407
408func (w *Writer) writeMeta() error {
409	w.buf1.Reset()
410	w.buf1.PutBE32(MagicIndex)
411	w.buf1.PutByte(FormatV2)
412
413	return w.write(w.buf1.Get())
414}
415
416// AddSeries adds the series one at a time along with its chunks.
417func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error {
418	if err := w.ensureStage(idxStageSeries); err != nil {
419		return err
420	}
421	if labels.Compare(lset, w.lastSeries) <= 0 {
422		return errors.Errorf("out-of-order series added with label set %q", lset)
423	}
424
425	if ref < w.lastRef && len(w.lastSeries) != 0 {
426		return errors.Errorf("series with reference greater than %d already added", ref)
427	}
428	// We add padding to 16 bytes to increase the addressable space we get through 4 byte
429	// series references.
430	if err := w.addPadding(16); err != nil {
431		return errors.Errorf("failed to write padding bytes: %v", err)
432	}
433
434	if w.f.pos%16 != 0 {
435		return errors.Errorf("series write not 16-byte aligned at %d", w.f.pos)
436	}
437
438	w.buf2.Reset()
439	w.buf2.PutUvarint(len(lset))
440
441	for _, l := range lset {
442		var err error
443		cacheEntry, ok := w.symbolCache[l.Name]
444		nameIndex := cacheEntry.index
445		if !ok {
446			nameIndex, err = w.symbols.ReverseLookup(l.Name)
447			if err != nil {
448				return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err)
449			}
450		}
451		w.labelNames[l.Name]++
452		w.buf2.PutUvarint32(nameIndex)
453
454		valueIndex := cacheEntry.lastValueIndex
455		if !ok || cacheEntry.lastValue != l.Value {
456			valueIndex, err = w.symbols.ReverseLookup(l.Value)
457			if err != nil {
458				return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err)
459			}
460			w.symbolCache[l.Name] = symbolCacheEntry{
461				index:          nameIndex,
462				lastValue:      l.Value,
463				lastValueIndex: valueIndex,
464			}
465		}
466		w.buf2.PutUvarint32(valueIndex)
467	}
468
469	w.buf2.PutUvarint(len(chunks))
470
471	if len(chunks) > 0 {
472		c := chunks[0]
473		w.buf2.PutVarint64(c.MinTime)
474		w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime))
475		w.buf2.PutUvarint64(c.Ref)
476		t0 := c.MaxTime
477		ref0 := int64(c.Ref)
478
479		for _, c := range chunks[1:] {
480			w.buf2.PutUvarint64(uint64(c.MinTime - t0))
481			w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime))
482			t0 = c.MaxTime
483
484			w.buf2.PutVarint64(int64(c.Ref) - ref0)
485			ref0 = int64(c.Ref)
486		}
487	}
488
489	w.buf1.Reset()
490	w.buf1.PutUvarint(w.buf2.Len())
491
492	w.buf2.PutHash(w.crc32)
493
494	if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil {
495		return errors.Wrap(err, "write series data")
496	}
497
498	w.lastSeries = append(w.lastSeries[:0], lset...)
499	w.lastRef = ref
500
501	return nil
502}
503
504func (w *Writer) startSymbols() error {
505	// We are at w.toc.Symbols.
506	// Leave 4 bytes of space for the length, and another 4 for the number of symbols
507	// which will both be calculated later.
508	return w.write([]byte("alenblen"))
509}
510
511func (w *Writer) AddSymbol(sym string) error {
512	if err := w.ensureStage(idxStageSymbols); err != nil {
513		return err
514	}
515	if w.numSymbols != 0 && sym <= w.lastSymbol {
516		return errors.Errorf("symbol %q out-of-order", sym)
517	}
518	w.lastSymbol = sym
519	w.numSymbols++
520	w.buf1.Reset()
521	w.buf1.PutUvarintStr(sym)
522	return w.write(w.buf1.Get())
523}
524
525func (w *Writer) finishSymbols() error {
526	symbolTableSize := w.f.pos - w.toc.Symbols - 4
527	// The symbol table's <len> part is 4 bytes. So the total symbol table size must be less than or equal to 2^32-1
528	if symbolTableSize > 4294967295 {
529		return errors.Errorf("symbol table size exceeds 4 bytes: %d", symbolTableSize)
530	}
531
532	// Write out the length and symbol count.
533	w.buf1.Reset()
534	w.buf1.PutBE32int(int(symbolTableSize))
535	w.buf1.PutBE32int(int(w.numSymbols))
536	if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil {
537		return err
538	}
539
540	hashPos := w.f.pos
541	// Leave space for the hash. We can only calculate it
542	// now that the number of symbols is known, so mmap and do it from there.
543	if err := w.write([]byte("hash")); err != nil {
544		return err
545	}
546	if err := w.f.Flush(); err != nil {
547		return err
548	}
549
550	sf, err := fileutil.OpenMmapFile(w.f.name)
551	if err != nil {
552		return err
553	}
554	w.symbolFile = sf
555	hash := crc32.Checksum(w.symbolFile.Bytes()[w.toc.Symbols+4:hashPos], castagnoliTable)
556	w.buf1.Reset()
557	w.buf1.PutBE32(hash)
558	if err := w.writeAt(w.buf1.Get(), hashPos); err != nil {
559		return err
560	}
561
562	// Load in the symbol table efficiently for the rest of the index writing.
563	w.symbols, err = NewSymbols(realByteSlice(w.symbolFile.Bytes()), FormatV2, int(w.toc.Symbols))
564	if err != nil {
565		return errors.Wrap(err, "read symbols")
566	}
567	return nil
568}
569
570func (w *Writer) writeLabelIndices() error {
571	if err := w.fPO.Flush(); err != nil {
572		return err
573	}
574
575	// Find all the label values in the tmp posting offset table.
576	f, err := fileutil.OpenMmapFile(w.fPO.name)
577	if err != nil {
578		return err
579	}
580	defer f.Close()
581
582	d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos))
583	cnt := w.cntPO
584	current := []byte{}
585	values := []uint32{}
586	for d.Err() == nil && cnt > 0 {
587		cnt--
588		d.Uvarint()                           // Keycount.
589		name := d.UvarintBytes()              // Label name.
590		value := yoloString(d.UvarintBytes()) // Label value.
591		d.Uvarint64()                         // Offset.
592		if len(name) == 0 {
593			continue // All index is ignored.
594		}
595
596		if !bytes.Equal(name, current) && len(values) > 0 {
597			// We've reached a new label name.
598			if err := w.writeLabelIndex(string(current), values); err != nil {
599				return err
600			}
601			values = values[:0]
602		}
603		current = name
604		sid, err := w.symbols.ReverseLookup(value)
605		if err != nil {
606			return err
607		}
608		values = append(values, sid)
609	}
610	if d.Err() != nil {
611		return d.Err()
612	}
613
614	// Handle the last label.
615	if len(values) > 0 {
616		if err := w.writeLabelIndex(string(current), values); err != nil {
617			return err
618		}
619	}
620	return nil
621}
622
623func (w *Writer) writeLabelIndex(name string, values []uint32) error {
624	// Align beginning to 4 bytes for more efficient index list scans.
625	if err := w.addPadding(4); err != nil {
626		return err
627	}
628
629	w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{
630		keys:   []string{name},
631		offset: w.f.pos,
632	})
633
634	startPos := w.f.pos
635	// Leave 4 bytes of space for the length, which will be calculated later.
636	if err := w.write([]byte("alen")); err != nil {
637		return err
638	}
639	w.crc32.Reset()
640
641	w.buf1.Reset()
642	w.buf1.PutBE32int(1) // Number of names.
643	w.buf1.PutBE32int(len(values))
644	w.buf1.WriteToHash(w.crc32)
645	if err := w.write(w.buf1.Get()); err != nil {
646		return err
647	}
648
649	for _, v := range values {
650		w.buf1.Reset()
651		w.buf1.PutBE32(v)
652		w.buf1.WriteToHash(w.crc32)
653		if err := w.write(w.buf1.Get()); err != nil {
654			return err
655		}
656	}
657
658	// Write out the length.
659	w.buf1.Reset()
660	w.buf1.PutBE32int(int(w.f.pos - startPos - 4))
661	if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
662		return err
663	}
664
665	w.buf1.Reset()
666	w.buf1.PutHashSum(w.crc32)
667	return w.write(w.buf1.Get())
668}
669
670// writeLabelIndexesOffsetTable writes the label indices offset table.
671func (w *Writer) writeLabelIndexesOffsetTable() error {
672	startPos := w.f.pos
673	// Leave 4 bytes of space for the length, which will be calculated later.
674	if err := w.write([]byte("alen")); err != nil {
675		return err
676	}
677	w.crc32.Reset()
678
679	w.buf1.Reset()
680	w.buf1.PutBE32int(len(w.labelIndexes))
681	w.buf1.WriteToHash(w.crc32)
682	if err := w.write(w.buf1.Get()); err != nil {
683		return err
684	}
685
686	for _, e := range w.labelIndexes {
687		w.buf1.Reset()
688		w.buf1.PutUvarint(len(e.keys))
689		for _, k := range e.keys {
690			w.buf1.PutUvarintStr(k)
691		}
692		w.buf1.PutUvarint64(e.offset)
693		w.buf1.WriteToHash(w.crc32)
694		if err := w.write(w.buf1.Get()); err != nil {
695			return err
696		}
697	}
698	// Write out the length.
699	w.buf1.Reset()
700	w.buf1.PutBE32int(int(w.f.pos - startPos - 4))
701	if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
702		return err
703	}
704
705	w.buf1.Reset()
706	w.buf1.PutHashSum(w.crc32)
707	return w.write(w.buf1.Get())
708}
709
710// writePostingsOffsetTable writes the postings offset table.
711func (w *Writer) writePostingsOffsetTable() error {
712	// Ensure everything is in the temporary file.
713	if err := w.fPO.Flush(); err != nil {
714		return err
715	}
716
717	startPos := w.f.pos
718	// Leave 4 bytes of space for the length, which will be calculated later.
719	if err := w.write([]byte("alen")); err != nil {
720		return err
721	}
722
723	// Copy over the tmp posting offset table, however we need to
724	// adjust the offsets.
725	adjustment := w.postingsStart
726
727	w.buf1.Reset()
728	w.crc32.Reset()
729	w.buf1.PutBE32int(int(w.cntPO)) // Count.
730	w.buf1.WriteToHash(w.crc32)
731	if err := w.write(w.buf1.Get()); err != nil {
732		return err
733	}
734
735	f, err := fileutil.OpenMmapFile(w.fPO.name)
736	if err != nil {
737		return err
738	}
739	defer func() {
740		if f != nil {
741			f.Close()
742		}
743	}()
744	d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos))
745	cnt := w.cntPO
746	for d.Err() == nil && cnt > 0 {
747		w.buf1.Reset()
748		w.buf1.PutUvarint(d.Uvarint())                     // Keycount.
749		w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label name.
750		w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label value.
751		w.buf1.PutUvarint64(d.Uvarint64() + adjustment)    // Offset.
752		w.buf1.WriteToHash(w.crc32)
753		if err := w.write(w.buf1.Get()); err != nil {
754			return err
755		}
756		cnt--
757	}
758	if d.Err() != nil {
759		return d.Err()
760	}
761
762	// Cleanup temporary file.
763	if err := f.Close(); err != nil {
764		return err
765	}
766	f = nil
767	if err := w.fPO.Close(); err != nil {
768		return err
769	}
770	if err := w.fPO.Remove(); err != nil {
771		return err
772	}
773	w.fPO = nil
774
775	// Write out the length.
776	w.buf1.Reset()
777	w.buf1.PutBE32int(int(w.f.pos - startPos - 4))
778	if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
779		return err
780	}
781
782	// Finally write the hash.
783	w.buf1.Reset()
784	w.buf1.PutHashSum(w.crc32)
785	return w.write(w.buf1.Get())
786}
787
788const indexTOCLen = 6*8 + crc32.Size
789
790func (w *Writer) writeTOC() error {
791	w.buf1.Reset()
792
793	w.buf1.PutBE64(w.toc.Symbols)
794	w.buf1.PutBE64(w.toc.Series)
795	w.buf1.PutBE64(w.toc.LabelIndices)
796	w.buf1.PutBE64(w.toc.LabelIndicesTable)
797	w.buf1.PutBE64(w.toc.Postings)
798	w.buf1.PutBE64(w.toc.PostingsTable)
799
800	w.buf1.PutHash(w.crc32)
801
802	return w.write(w.buf1.Get())
803}
804
805func (w *Writer) writePostingsToTmpFiles() error {
806	names := make([]string, 0, len(w.labelNames))
807	for n := range w.labelNames {
808		names = append(names, n)
809	}
810	sort.Strings(names)
811
812	if err := w.f.Flush(); err != nil {
813		return err
814	}
815	f, err := fileutil.OpenMmapFile(w.f.name)
816	if err != nil {
817		return err
818	}
819	defer f.Close()
820
821	// Write out the special all posting.
822	offsets := []uint32{}
823	d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
824	d.Skip(int(w.toc.Series))
825	for d.Len() > 0 {
826		d.ConsumePadding()
827		startPos := w.toc.LabelIndices - uint64(d.Len())
828		if startPos%16 != 0 {
829			return errors.Errorf("series not 16-byte aligned at %d", startPos)
830		}
831		offsets = append(offsets, uint32(startPos/16))
832		// Skip to next series.
833		x := d.Uvarint()
834		d.Skip(x + crc32.Size)
835		if err := d.Err(); err != nil {
836			return err
837		}
838	}
839	if err := w.writePosting("", "", offsets); err != nil {
840		return err
841	}
842	maxPostings := uint64(len(offsets)) // No label name can have more postings than this.
843
844	for len(names) > 0 {
845		batchNames := []string{}
846		var c uint64
847		// Try to bunch up label names into one loop, but avoid
848		// using more memory than a single label name can.
849		for len(names) > 0 {
850			if w.labelNames[names[0]]+c > maxPostings {
851				break
852			}
853			batchNames = append(batchNames, names[0])
854			c += w.labelNames[names[0]]
855			names = names[1:]
856		}
857
858		nameSymbols := map[uint32]string{}
859		for _, name := range batchNames {
860			sid, err := w.symbols.ReverseLookup(name)
861			if err != nil {
862				return err
863			}
864			nameSymbols[sid] = name
865		}
866		// Label name -> label value -> positions.
867		postings := map[uint32]map[uint32][]uint32{}
868
869		d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices))
870		d.Skip(int(w.toc.Series))
871		for d.Len() > 0 {
872			d.ConsumePadding()
873			startPos := w.toc.LabelIndices - uint64(d.Len())
874			l := d.Uvarint() // Length of this series in bytes.
875			startLen := d.Len()
876
877			// See if label names we want are in the series.
878			numLabels := d.Uvarint()
879			for i := 0; i < numLabels; i++ {
880				lno := uint32(d.Uvarint())
881				lvo := uint32(d.Uvarint())
882
883				if _, ok := nameSymbols[lno]; ok {
884					if _, ok := postings[lno]; !ok {
885						postings[lno] = map[uint32][]uint32{}
886					}
887					postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16))
888				}
889			}
890			// Skip to next series.
891			d.Skip(l - (startLen - d.Len()) + crc32.Size)
892			if err := d.Err(); err != nil {
893				return err
894			}
895		}
896
897		for _, name := range batchNames {
898			// Write out postings for this label name.
899			sid, err := w.symbols.ReverseLookup(name)
900			if err != nil {
901				return err
902			}
903			values := make([]uint32, 0, len(postings[sid]))
904			for v := range postings[sid] {
905				values = append(values, v)
906
907			}
908			// Symbol numbers are in order, so the strings will also be in order.
909			sort.Sort(uint32slice(values))
910			for _, v := range values {
911				value, err := w.symbols.Lookup(v)
912				if err != nil {
913					return err
914				}
915				if err := w.writePosting(name, value, postings[sid][v]); err != nil {
916					return err
917				}
918			}
919		}
920		select {
921		case <-w.ctx.Done():
922			return w.ctx.Err()
923		default:
924		}
925
926	}
927	return nil
928}
929
930func (w *Writer) writePosting(name, value string, offs []uint32) error {
931	// Align beginning to 4 bytes for more efficient postings list scans.
932	if err := w.fP.AddPadding(4); err != nil {
933		return err
934	}
935
936	// Write out postings offset table to temporary file as we go.
937	w.buf1.Reset()
938	w.buf1.PutUvarint(2)
939	w.buf1.PutUvarintStr(name)
940	w.buf1.PutUvarintStr(value)
941	w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file.
942	if err := w.fPO.Write(w.buf1.Get()); err != nil {
943		return err
944	}
945	w.cntPO++
946
947	w.buf1.Reset()
948	w.buf1.PutBE32int(len(offs))
949
950	for _, off := range offs {
951		if off > (1<<32)-1 {
952			return errors.Errorf("series offset %d exceeds 4 bytes", off)
953		}
954		w.buf1.PutBE32(off)
955	}
956
957	w.buf2.Reset()
958	w.buf2.PutBE32int(w.buf1.Len())
959	w.buf1.PutHash(w.crc32)
960	return w.fP.Write(w.buf2.Get(), w.buf1.Get())
961}
962
963func (w *Writer) writePostings() error {
964	// There's padding in the tmp file, make sure it actually works.
965	if err := w.f.AddPadding(4); err != nil {
966		return err
967	}
968	w.postingsStart = w.f.pos
969
970	// Copy temporary file into main index.
971	if err := w.fP.Flush(); err != nil {
972		return err
973	}
974	if _, err := w.fP.f.Seek(0, 0); err != nil {
975		return err
976	}
977	// Don't need to calculate a checksum, so can copy directly.
978	n, err := io.CopyBuffer(w.f.fbuf, w.fP.f, make([]byte, 1<<20))
979	if err != nil {
980		return err
981	}
982	if uint64(n) != w.fP.pos {
983		return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.pos, n)
984	}
985	w.f.pos += uint64(n)
986
987	if err := w.fP.Close(); err != nil {
988		return err
989	}
990	if err := w.fP.Remove(); err != nil {
991		return err
992	}
993	w.fP = nil
994	return nil
995}
996
997type uint32slice []uint32
998
999func (s uint32slice) Len() int           { return len(s) }
1000func (s uint32slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
1001func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
1002
1003type labelIndexHashEntry struct {
1004	keys   []string
1005	offset uint64
1006}
1007
1008func (w *Writer) Close() error {
1009	// Even if this fails, we need to close all the files.
1010	ensureErr := w.ensureStage(idxStageDone)
1011
1012	if w.symbolFile != nil {
1013		if err := w.symbolFile.Close(); err != nil {
1014			return err
1015		}
1016	}
1017	if w.fP != nil {
1018		if err := w.fP.Close(); err != nil {
1019			return err
1020		}
1021	}
1022	if w.fPO != nil {
1023		if err := w.fPO.Close(); err != nil {
1024			return err
1025		}
1026	}
1027	if err := w.f.Close(); err != nil {
1028		return err
1029	}
1030	return ensureErr
1031}
1032
1033// StringIter iterates over a sorted list of strings.
1034type StringIter interface {
1035	// Next advances the iterator and returns true if another value was found.
1036	Next() bool
1037
1038	// At returns the value at the current iterator position.
1039	At() string
1040
1041	// Err returns the last error of the iterator.
1042	Err() error
1043}
1044
1045type Reader struct {
1046	b   ByteSlice
1047	toc *TOC
1048
1049	// Close that releases the underlying resources of the byte slice.
1050	c io.Closer
1051
1052	// Map of LabelName to a list of some LabelValues's position in the offset table.
1053	// The first and last values for each name are always present.
1054	postings map[string][]postingOffset
1055	// For the v1 format, labelname -> labelvalue -> offset.
1056	postingsV1 map[string]map[string]uint64
1057
1058	symbols     *Symbols
1059	nameSymbols map[uint32]string // Cache of the label name symbol lookups,
1060	// as there are not many and they are half of all lookups.
1061
1062	dec *Decoder
1063
1064	version int
1065}
1066
1067type postingOffset struct {
1068	value string
1069	off   int
1070}
1071
1072// ByteSlice abstracts a byte slice.
1073type ByteSlice interface {
1074	Len() int
1075	Range(start, end int) []byte
1076}
1077
1078type realByteSlice []byte
1079
1080func (b realByteSlice) Len() int {
1081	return len(b)
1082}
1083
1084func (b realByteSlice) Range(start, end int) []byte {
1085	return b[start:end]
1086}
1087
1088func (b realByteSlice) Sub(start, end int) ByteSlice {
1089	return b[start:end]
1090}
1091
1092// NewReader returns a new index reader on the given byte slice. It automatically
1093// handles different format versions.
1094func NewReader(b ByteSlice) (*Reader, error) {
1095	return newReader(b, ioutil.NopCloser(nil))
1096}
1097
1098// NewFileReader returns a new index reader against the given index file.
1099func NewFileReader(path string) (*Reader, error) {
1100	f, err := fileutil.OpenMmapFile(path)
1101	if err != nil {
1102		return nil, err
1103	}
1104	r, err := newReader(realByteSlice(f.Bytes()), f)
1105	if err != nil {
1106		return nil, tsdb_errors.NewMulti(
1107			err,
1108			f.Close(),
1109		).Err()
1110	}
1111
1112	return r, nil
1113}
1114
1115func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
1116	r := &Reader{
1117		b:        b,
1118		c:        c,
1119		postings: map[string][]postingOffset{},
1120	}
1121
1122	// Verify header.
1123	if r.b.Len() < HeaderLen {
1124		return nil, errors.Wrap(encoding.ErrInvalidSize, "index header")
1125	}
1126	if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
1127		return nil, errors.Errorf("invalid magic number %x", m)
1128	}
1129	r.version = int(r.b.Range(4, 5)[0])
1130
1131	if r.version != FormatV1 && r.version != FormatV2 {
1132		return nil, errors.Errorf("unknown index file version %d", r.version)
1133	}
1134
1135	var err error
1136	r.toc, err = NewTOCFromByteSlice(b)
1137	if err != nil {
1138		return nil, errors.Wrap(err, "read TOC")
1139	}
1140
1141	r.symbols, err = NewSymbols(r.b, r.version, int(r.toc.Symbols))
1142	if err != nil {
1143		return nil, errors.Wrap(err, "read symbols")
1144	}
1145
1146	if r.version == FormatV1 {
1147		// Earlier V1 formats don't have a sorted postings offset table, so
1148		// load the whole offset table into memory.
1149		r.postingsV1 = map[string]map[string]uint64{}
1150		if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error {
1151			if len(key) != 2 {
1152				return errors.Errorf("unexpected key length for posting table %d", len(key))
1153			}
1154			if _, ok := r.postingsV1[key[0]]; !ok {
1155				r.postingsV1[key[0]] = map[string]uint64{}
1156				r.postings[key[0]] = nil // Used to get a list of labelnames in places.
1157			}
1158			r.postingsV1[key[0]][key[1]] = off
1159			return nil
1160		}); err != nil {
1161			return nil, errors.Wrap(err, "read postings table")
1162		}
1163	} else {
1164		var lastKey []string
1165		lastOff := 0
1166		valueCount := 0
1167		// For the postings offset table we keep every label name but only every nth
1168		// label value (plus the first and last one), to save memory.
1169		if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, _ uint64, off int) error {
1170			if len(key) != 2 {
1171				return errors.Errorf("unexpected key length for posting table %d", len(key))
1172			}
1173			if _, ok := r.postings[key[0]]; !ok {
1174				// Next label name.
1175				r.postings[key[0]] = []postingOffset{}
1176				if lastKey != nil {
1177					// Always include last value for each label name.
1178					r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
1179				}
1180				lastKey = nil
1181				valueCount = 0
1182			}
1183			if valueCount%symbolFactor == 0 {
1184				r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], off: off})
1185				lastKey = nil
1186			} else {
1187				lastKey = key
1188				lastOff = off
1189			}
1190			valueCount++
1191			return nil
1192		}); err != nil {
1193			return nil, errors.Wrap(err, "read postings table")
1194		}
1195		if lastKey != nil {
1196			r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], off: lastOff})
1197		}
1198		// Trim any extra space in the slices.
1199		for k, v := range r.postings {
1200			l := make([]postingOffset, len(v))
1201			copy(l, v)
1202			r.postings[k] = l
1203		}
1204	}
1205
1206	r.nameSymbols = make(map[uint32]string, len(r.postings))
1207	for k := range r.postings {
1208		if k == "" {
1209			continue
1210		}
1211		off, err := r.symbols.ReverseLookup(k)
1212		if err != nil {
1213			return nil, errors.Wrap(err, "reverse symbol lookup")
1214		}
1215		r.nameSymbols[off] = k
1216	}
1217
1218	r.dec = &Decoder{LookupSymbol: r.lookupSymbol}
1219
1220	return r, nil
1221}
1222
1223// Version returns the file format version of the underlying index.
1224func (r *Reader) Version() int {
1225	return r.version
1226}
1227
1228// Range marks a byte range.
1229type Range struct {
1230	Start, End int64
1231}
1232
1233// PostingsRanges returns a new map of byte range in the underlying index file
1234// for all postings lists.
1235func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
1236	m := map[labels.Label]Range{}
1237	if err := ReadOffsetTable(r.b, r.toc.PostingsTable, func(key []string, off uint64, _ int) error {
1238		if len(key) != 2 {
1239			return errors.Errorf("unexpected key length for posting table %d", len(key))
1240		}
1241		d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable)
1242		if d.Err() != nil {
1243			return d.Err()
1244		}
1245		m[labels.Label{Name: key[0], Value: key[1]}] = Range{
1246			Start: int64(off) + 4,
1247			End:   int64(off) + 4 + int64(d.Len()),
1248		}
1249		return nil
1250	}); err != nil {
1251		return nil, errors.Wrap(err, "read postings table")
1252	}
1253	return m, nil
1254}
1255
1256type Symbols struct {
1257	bs      ByteSlice
1258	version int
1259	off     int
1260
1261	offsets []int
1262	seen    int
1263}
1264
1265const symbolFactor = 32
1266
1267// NewSymbols returns a Symbols object for symbol lookups.
1268func NewSymbols(bs ByteSlice, version int, off int) (*Symbols, error) {
1269	s := &Symbols{
1270		bs:      bs,
1271		version: version,
1272		off:     off,
1273	}
1274	d := encoding.NewDecbufAt(bs, off, castagnoliTable)
1275	var (
1276		origLen = d.Len()
1277		cnt     = d.Be32int()
1278		basePos = off + 4
1279	)
1280	s.offsets = make([]int, 0, 1+cnt/symbolFactor)
1281	for d.Err() == nil && s.seen < cnt {
1282		if s.seen%symbolFactor == 0 {
1283			s.offsets = append(s.offsets, basePos+origLen-d.Len())
1284		}
1285		d.UvarintBytes() // The symbol.
1286		s.seen++
1287	}
1288	if d.Err() != nil {
1289		return nil, d.Err()
1290	}
1291	return s, nil
1292}
1293
1294func (s Symbols) Lookup(o uint32) (string, error) {
1295	d := encoding.Decbuf{
1296		B: s.bs.Range(0, s.bs.Len()),
1297	}
1298
1299	if s.version == FormatV2 {
1300		if int(o) >= s.seen {
1301			return "", errors.Errorf("unknown symbol offset %d", o)
1302		}
1303		d.Skip(s.offsets[int(o/symbolFactor)])
1304		// Walk until we find the one we want.
1305		for i := o - (o / symbolFactor * symbolFactor); i > 0; i-- {
1306			d.UvarintBytes()
1307		}
1308	} else {
1309		d.Skip(int(o))
1310	}
1311	sym := d.UvarintStr()
1312	if d.Err() != nil {
1313		return "", d.Err()
1314	}
1315	return sym, nil
1316}
1317
1318func (s Symbols) ReverseLookup(sym string) (uint32, error) {
1319	if len(s.offsets) == 0 {
1320		return 0, errors.Errorf("unknown symbol %q - no symbols", sym)
1321	}
1322	i := sort.Search(len(s.offsets), func(i int) bool {
1323		// Any decoding errors here will be lost, however
1324		// we already read through all of this at startup.
1325		d := encoding.Decbuf{
1326			B: s.bs.Range(0, s.bs.Len()),
1327		}
1328		d.Skip(s.offsets[i])
1329		return yoloString(d.UvarintBytes()) > sym
1330	})
1331	d := encoding.Decbuf{
1332		B: s.bs.Range(0, s.bs.Len()),
1333	}
1334	if i > 0 {
1335		i--
1336	}
1337	d.Skip(s.offsets[i])
1338	res := i * symbolFactor
1339	var lastLen int
1340	var lastSymbol string
1341	for d.Err() == nil && res <= s.seen {
1342		lastLen = d.Len()
1343		lastSymbol = yoloString(d.UvarintBytes())
1344		if lastSymbol >= sym {
1345			break
1346		}
1347		res++
1348	}
1349	if d.Err() != nil {
1350		return 0, d.Err()
1351	}
1352	if lastSymbol != sym {
1353		return 0, errors.Errorf("unknown symbol %q", sym)
1354	}
1355	if s.version == FormatV2 {
1356		return uint32(res), nil
1357	}
1358	return uint32(s.bs.Len() - lastLen), nil
1359}
1360
1361func (s Symbols) Size() int {
1362	return len(s.offsets) * 8
1363}
1364
1365func (s Symbols) Iter() StringIter {
1366	d := encoding.NewDecbufAt(s.bs, s.off, castagnoliTable)
1367	cnt := d.Be32int()
1368	return &symbolsIter{
1369		d:   d,
1370		cnt: cnt,
1371	}
1372}
1373
1374// symbolsIter implements StringIter.
1375type symbolsIter struct {
1376	d   encoding.Decbuf
1377	cnt int
1378	cur string
1379	err error
1380}
1381
1382func (s *symbolsIter) Next() bool {
1383	if s.cnt == 0 || s.err != nil {
1384		return false
1385	}
1386	s.cur = yoloString(s.d.UvarintBytes())
1387	s.cnt--
1388	if s.d.Err() != nil {
1389		s.err = s.d.Err()
1390		return false
1391	}
1392	return true
1393}
1394
1395func (s symbolsIter) At() string { return s.cur }
1396func (s symbolsIter) Err() error { return s.err }
1397
1398// ReadOffsetTable reads an offset table and at the given position calls f for each
1399// found entry. If f returns an error it stops decoding and returns the received error.
1400func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64, int) error) error {
1401	d := encoding.NewDecbufAt(bs, int(off), castagnoliTable)
1402	startLen := d.Len()
1403	cnt := d.Be32()
1404
1405	for d.Err() == nil && d.Len() > 0 && cnt > 0 {
1406		offsetPos := startLen - d.Len()
1407		keyCount := d.Uvarint()
1408		// The Postings offset table takes only 2 keys per entry (name and value of label),
1409		// and the LabelIndices offset table takes only 1 key per entry (a label name).
1410		// Hence setting the size to max of both, i.e. 2.
1411		keys := make([]string, 0, 2)
1412
1413		for i := 0; i < keyCount; i++ {
1414			keys = append(keys, d.UvarintStr())
1415		}
1416		o := d.Uvarint64()
1417		if d.Err() != nil {
1418			break
1419		}
1420		if err := f(keys, o, offsetPos); err != nil {
1421			return err
1422		}
1423		cnt--
1424	}
1425	return d.Err()
1426}
1427
1428// Close the reader and its underlying resources.
1429func (r *Reader) Close() error {
1430	return r.c.Close()
1431}
1432
1433func (r *Reader) lookupSymbol(o uint32) (string, error) {
1434	if s, ok := r.nameSymbols[o]; ok {
1435		return s, nil
1436	}
1437	return r.symbols.Lookup(o)
1438}
1439
1440// Symbols returns an iterator over the symbols that exist within the index.
1441func (r *Reader) Symbols() StringIter {
1442	return r.symbols.Iter()
1443}
1444
1445// SymbolTableSize returns the symbol table size in bytes.
1446func (r *Reader) SymbolTableSize() uint64 {
1447	return uint64(r.symbols.Size())
1448}
1449
1450// SortedLabelValues returns value tuples that exist for the given label name.
1451// It is not safe to use the return value beyond the lifetime of the byte slice
1452// passed into the Reader.
1453func (r *Reader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
1454	values, err := r.LabelValues(name, matchers...)
1455	if err == nil && r.version == FormatV1 {
1456		sort.Strings(values)
1457	}
1458	return values, err
1459}
1460
1461// LabelValues returns value tuples that exist for the given label name.
1462// It is not safe to use the return value beyond the lifetime of the byte slice
1463// passed into the Reader.
1464// TODO(replay): Support filtering by matchers
1465func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
1466	if len(matchers) > 0 {
1467		return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers)
1468	}
1469
1470	if r.version == FormatV1 {
1471		e, ok := r.postingsV1[name]
1472		if !ok {
1473			return nil, nil
1474		}
1475		values := make([]string, 0, len(e))
1476		for k := range e {
1477			values = append(values, k)
1478		}
1479		return values, nil
1480
1481	}
1482	e, ok := r.postings[name]
1483	if !ok {
1484		return nil, nil
1485	}
1486	if len(e) == 0 {
1487		return nil, nil
1488	}
1489	values := make([]string, 0, len(e)*symbolFactor)
1490
1491	d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
1492	d.Skip(e[0].off)
1493	lastVal := e[len(e)-1].value
1494
1495	skip := 0
1496	for d.Err() == nil {
1497		if skip == 0 {
1498			// These are always the same number of bytes,
1499			// and it's faster to skip than parse.
1500			skip = d.Len()
1501			d.Uvarint()      // Keycount.
1502			d.UvarintBytes() // Label name.
1503			skip -= d.Len()
1504		} else {
1505			d.Skip(skip)
1506		}
1507		s := yoloString(d.UvarintBytes()) //Label value.
1508		values = append(values, s)
1509		if s == lastVal {
1510			break
1511		}
1512		d.Uvarint64() // Offset.
1513	}
1514	if d.Err() != nil {
1515		return nil, errors.Wrap(d.Err(), "get postings offset entry")
1516	}
1517	return values, nil
1518}
1519
1520// LabelNamesFor returns all the label names for the series referred to by IDs.
1521// The names returned are sorted.
1522func (r *Reader) LabelNamesFor(ids ...uint64) ([]string, error) {
1523	// Gather offsetsMap the name offsetsMap in the symbol table first
1524	offsetsMap := make(map[uint32]struct{})
1525	for _, id := range ids {
1526		offset := id
1527		// In version 2 series IDs are no longer exact references but series are 16-byte padded
1528		// and the ID is the multiple of 16 of the actual position.
1529		if r.version == FormatV2 {
1530			offset = id * 16
1531		}
1532
1533		d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
1534		buf := d.Get()
1535		if d.Err() != nil {
1536			return nil, errors.Wrap(d.Err(), "get buffer for series")
1537		}
1538
1539		offsets, err := r.dec.LabelNamesOffsetsFor(buf)
1540		if err != nil {
1541			return nil, errors.Wrap(err, "get label name offsets")
1542		}
1543		for _, off := range offsets {
1544			offsetsMap[off] = struct{}{}
1545		}
1546	}
1547
1548	// Lookup the unique symbols.
1549	names := make([]string, 0, len(offsetsMap))
1550	for off := range offsetsMap {
1551		name, err := r.lookupSymbol(off)
1552		if err != nil {
1553			return nil, errors.Wrap(err, "lookup symbol in LabelNamesFor")
1554		}
1555		names = append(names, name)
1556	}
1557
1558	sort.Strings(names)
1559
1560	return names, nil
1561}
1562
1563// LabelValueFor returns label value for the given label name in the series referred to by ID.
1564func (r *Reader) LabelValueFor(id uint64, label string) (string, error) {
1565	offset := id
1566	// In version 2 series IDs are no longer exact references but series are 16-byte padded
1567	// and the ID is the multiple of 16 of the actual position.
1568	if r.version == FormatV2 {
1569		offset = id * 16
1570	}
1571	d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
1572	buf := d.Get()
1573	if d.Err() != nil {
1574		return "", errors.Wrap(d.Err(), "label values for")
1575	}
1576
1577	value, err := r.dec.LabelValueFor(buf, label)
1578	if err != nil {
1579		return "", storage.ErrNotFound
1580	}
1581
1582	if value == "" {
1583		return "", storage.ErrNotFound
1584	}
1585
1586	return value, nil
1587}
1588
1589// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
1590func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
1591	offset := id
1592	// In version 2 series IDs are no longer exact references but series are 16-byte padded
1593	// and the ID is the multiple of 16 of the actual position.
1594	if r.version == FormatV2 {
1595		offset = id * 16
1596	}
1597	d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
1598	if d.Err() != nil {
1599		return d.Err()
1600	}
1601	return errors.Wrap(r.dec.Series(d.Get(), lbls, chks), "read series")
1602}
1603
1604func (r *Reader) Postings(name string, values ...string) (Postings, error) {
1605	if r.version == FormatV1 {
1606		e, ok := r.postingsV1[name]
1607		if !ok {
1608			return EmptyPostings(), nil
1609		}
1610		res := make([]Postings, 0, len(values))
1611		for _, v := range values {
1612			postingsOff, ok := e[v]
1613			if !ok {
1614				continue
1615			}
1616			// Read from the postings table.
1617			d := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
1618			_, p, err := r.dec.Postings(d.Get())
1619			if err != nil {
1620				return nil, errors.Wrap(err, "decode postings")
1621			}
1622			res = append(res, p)
1623		}
1624		return Merge(res...), nil
1625	}
1626
1627	e, ok := r.postings[name]
1628	if !ok {
1629		return EmptyPostings(), nil
1630	}
1631
1632	if len(values) == 0 {
1633		return EmptyPostings(), nil
1634	}
1635
1636	res := make([]Postings, 0, len(values))
1637	skip := 0
1638	valueIndex := 0
1639	for valueIndex < len(values) && values[valueIndex] < e[0].value {
1640		// Discard values before the start.
1641		valueIndex++
1642	}
1643	for valueIndex < len(values) {
1644		value := values[valueIndex]
1645
1646		i := sort.Search(len(e), func(i int) bool { return e[i].value >= value })
1647		if i == len(e) {
1648			// We're past the end.
1649			break
1650		}
1651		if i > 0 && e[i].value != value {
1652			// Need to look from previous entry.
1653			i--
1654		}
1655		// Don't Crc32 the entire postings offset table, this is very slow
1656		// so hope any issues were caught at startup.
1657		d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
1658		d.Skip(e[i].off)
1659
1660		// Iterate on the offset table.
1661		var postingsOff uint64 // The offset into the postings table.
1662		for d.Err() == nil {
1663			if skip == 0 {
1664				// These are always the same number of bytes,
1665				// and it's faster to skip than parse.
1666				skip = d.Len()
1667				d.Uvarint()      // Keycount.
1668				d.UvarintBytes() // Label name.
1669				skip -= d.Len()
1670			} else {
1671				d.Skip(skip)
1672			}
1673			v := d.UvarintBytes()       // Label value.
1674			postingsOff = d.Uvarint64() // Offset.
1675			for string(v) >= value {
1676				if string(v) == value {
1677					// Read from the postings table.
1678					d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
1679					_, p, err := r.dec.Postings(d2.Get())
1680					if err != nil {
1681						return nil, errors.Wrap(err, "decode postings")
1682					}
1683					res = append(res, p)
1684				}
1685				valueIndex++
1686				if valueIndex == len(values) {
1687					break
1688				}
1689				value = values[valueIndex]
1690			}
1691			if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) {
1692				// Need to go to a later postings offset entry, if there is one.
1693				break
1694			}
1695		}
1696		if d.Err() != nil {
1697			return nil, errors.Wrap(d.Err(), "get postings offset entry")
1698		}
1699	}
1700
1701	return Merge(res...), nil
1702}
1703
1704// SortedPostings returns the given postings list reordered so that the backing series
1705// are sorted.
1706func (r *Reader) SortedPostings(p Postings) Postings {
1707	return p
1708}
1709
1710// Size returns the size of an index file.
1711func (r *Reader) Size() int64 {
1712	return int64(r.b.Len())
1713}
1714
1715// LabelNames returns all the unique label names present in the index.
1716// TODO(twilkie) implement support for matchers
1717func (r *Reader) LabelNames(matchers ...*labels.Matcher) ([]string, error) {
1718	if len(matchers) > 0 {
1719		return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers)
1720	}
1721
1722	labelNames := make([]string, 0, len(r.postings))
1723	for name := range r.postings {
1724		if name == allPostingsKey.Name {
1725			// This is not from any metric.
1726			continue
1727		}
1728		labelNames = append(labelNames, name)
1729	}
1730	sort.Strings(labelNames)
1731	return labelNames, nil
1732}
1733
1734// NewStringListIter returns a StringIter for the given sorted list of strings.
1735func NewStringListIter(s []string) StringIter {
1736	return &stringListIter{l: s}
1737}
1738
1739// symbolsIter implements StringIter.
1740type stringListIter struct {
1741	l   []string
1742	cur string
1743}
1744
1745func (s *stringListIter) Next() bool {
1746	if len(s.l) == 0 {
1747		return false
1748	}
1749	s.cur = s.l[0]
1750	s.l = s.l[1:]
1751	return true
1752}
1753func (s stringListIter) At() string { return s.cur }
1754func (s stringListIter) Err() error { return nil }
1755
1756// Decoder provides decoding methods for the v1 and v2 index file format.
1757//
1758// It currently does not contain decoding methods for all entry types but can be extended
1759// by them if there's demand.
1760type Decoder struct {
1761	LookupSymbol func(uint32) (string, error)
1762}
1763
1764// Postings returns a postings list for b and its number of elements.
1765func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
1766	d := encoding.Decbuf{B: b}
1767	n := d.Be32int()
1768	l := d.Get()
1769	return n, newBigEndianPostings(l), d.Err()
1770}
1771
1772// LabelNamesOffsetsFor decodes the offsets of the name symbols for a given series.
1773// They are returned in the same order they're stored, which should be sorted lexicographically.
1774func (dec *Decoder) LabelNamesOffsetsFor(b []byte) ([]uint32, error) {
1775	d := encoding.Decbuf{B: b}
1776	k := d.Uvarint()
1777
1778	offsets := make([]uint32, k)
1779	for i := 0; i < k; i++ {
1780		offsets[i] = uint32(d.Uvarint())
1781		_ = d.Uvarint() // skip the label value
1782
1783		if d.Err() != nil {
1784			return nil, errors.Wrap(d.Err(), "read series label offsets")
1785		}
1786	}
1787
1788	return offsets, d.Err()
1789}
1790
1791// LabelValueFor decodes a label for a given series.
1792func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) {
1793	d := encoding.Decbuf{B: b}
1794	k := d.Uvarint()
1795
1796	for i := 0; i < k; i++ {
1797		lno := uint32(d.Uvarint())
1798		lvo := uint32(d.Uvarint())
1799
1800		if d.Err() != nil {
1801			return "", errors.Wrap(d.Err(), "read series label offsets")
1802		}
1803
1804		ln, err := dec.LookupSymbol(lno)
1805		if err != nil {
1806			return "", errors.Wrap(err, "lookup label name")
1807		}
1808
1809		if ln == label {
1810			lv, err := dec.LookupSymbol(lvo)
1811			if err != nil {
1812				return "", errors.Wrap(err, "lookup label value")
1813			}
1814
1815			return lv, nil
1816		}
1817	}
1818
1819	return "", d.Err()
1820}
1821
1822// Series decodes a series entry from the given byte slice into lset and chks.
1823func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
1824	*lbls = (*lbls)[:0]
1825	*chks = (*chks)[:0]
1826
1827	d := encoding.Decbuf{B: b}
1828
1829	k := d.Uvarint()
1830
1831	for i := 0; i < k; i++ {
1832		lno := uint32(d.Uvarint())
1833		lvo := uint32(d.Uvarint())
1834
1835		if d.Err() != nil {
1836			return errors.Wrap(d.Err(), "read series label offsets")
1837		}
1838
1839		ln, err := dec.LookupSymbol(lno)
1840		if err != nil {
1841			return errors.Wrap(err, "lookup label name")
1842		}
1843		lv, err := dec.LookupSymbol(lvo)
1844		if err != nil {
1845			return errors.Wrap(err, "lookup label value")
1846		}
1847
1848		*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
1849	}
1850
1851	// Read the chunks meta data.
1852	k = d.Uvarint()
1853
1854	if k == 0 {
1855		return d.Err()
1856	}
1857
1858	t0 := d.Varint64()
1859	maxt := int64(d.Uvarint64()) + t0
1860	ref0 := int64(d.Uvarint64())
1861
1862	*chks = append(*chks, chunks.Meta{
1863		Ref:     uint64(ref0),
1864		MinTime: t0,
1865		MaxTime: maxt,
1866	})
1867	t0 = maxt
1868
1869	for i := 1; i < k; i++ {
1870		mint := int64(d.Uvarint64()) + t0
1871		maxt := int64(d.Uvarint64()) + mint
1872
1873		ref0 += d.Varint64()
1874		t0 = maxt
1875
1876		if d.Err() != nil {
1877			return errors.Wrapf(d.Err(), "read meta for chunk %d", i)
1878		}
1879
1880		*chks = append(*chks, chunks.Meta{
1881			Ref:     uint64(ref0),
1882			MinTime: mint,
1883			MaxTime: maxt,
1884		})
1885	}
1886	return d.Err()
1887}
1888
1889func yoloString(b []byte) string {
1890	return *((*string)(unsafe.Pointer(&b)))
1891}
1892