1package tsi1
2
3import (
4	"bytes"
5	"encoding/binary"
6	"errors"
7	"io"
8	"sort"
9	"unsafe"
10
11	"github.com/influxdata/influxdb/pkg/estimator"
12	"github.com/influxdata/influxdb/pkg/estimator/hll"
13	"github.com/influxdata/influxdb/pkg/rhh"
14	"github.com/influxdata/influxdb/tsdb"
15)
16
17// MeasurementBlockVersion is the version of the measurement block.
18const MeasurementBlockVersion = 1
19
20// Measurement flag constants.
21const (
22	MeasurementTombstoneFlag = 0x01
23)
24
25// Measurement field size constants.
26const (
27	// 1 byte offset for the block to ensure non-zero offsets.
28	MeasurementFillSize = 1
29
30	// Measurement trailer fields
31	MeasurementTrailerSize = 0 +
32		2 + // version
33		8 + 8 + // data offset/size
34		8 + 8 + // hash index offset/size
35		8 + 8 + // measurement sketch offset/size
36		8 + 8 // tombstone measurement sketch offset/size
37
38	// Measurement key block fields.
39	MeasurementNSize      = 8
40	MeasurementOffsetSize = 8
41
42	SeriesIDSize = 8
43)
44
45// Measurement errors.
46var (
47	ErrUnsupportedMeasurementBlockVersion = errors.New("unsupported measurement block version")
48	ErrMeasurementBlockSizeMismatch       = errors.New("measurement block size mismatch")
49)
50
51// MeasurementBlock represents a collection of all measurements in an index.
52type MeasurementBlock struct {
53	data     []byte
54	hashData []byte
55
56	// Measurement sketch and tombstone sketch for cardinality estimation.
57	sketchData, tSketchData []byte
58
59	version int // block version
60}
61
62// bytes estimates the memory footprint of this MeasurementBlock, in bytes.
63func (blk *MeasurementBlock) bytes() int {
64	var b int
65	// Do not count contents of blk.data or blk.hashData because they reference into an external []byte
66	b += int(unsafe.Sizeof(*blk))
67	return b
68}
69
70// Version returns the encoding version parsed from the data.
71// Only valid after UnmarshalBinary() has been successfully invoked.
72func (blk *MeasurementBlock) Version() int { return blk.version }
73
74// Elem returns an element for a measurement.
75func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) {
76	n := int64(binary.BigEndian.Uint64(blk.hashData[:MeasurementNSize]))
77	hash := rhh.HashKey(name)
78	pos := hash % n
79
80	// Track current distance
81	var d int64
82	for {
83		// Find offset of measurement.
84		offset := binary.BigEndian.Uint64(blk.hashData[MeasurementNSize+(pos*MeasurementOffsetSize):])
85		if offset == 0 {
86			return MeasurementBlockElem{}, false
87		}
88
89		// Evaluate name if offset is not empty.
90		if offset > 0 {
91			// Parse into element.
92			var e MeasurementBlockElem
93			e.UnmarshalBinary(blk.data[offset:])
94
95			// Return if name match.
96			if bytes.Equal(e.name, name) {
97				return e, true
98			}
99
100			// Check if we've exceeded the probe distance.
101			if d > rhh.Dist(rhh.HashKey(e.name), pos, n) {
102				return MeasurementBlockElem{}, false
103			}
104		}
105
106		// Move position forward.
107		pos = (pos + 1) % n
108		d++
109
110		if d > n {
111			return MeasurementBlockElem{}, false
112		}
113	}
114}
115
116// UnmarshalBinary unpacks data into the block. Block is not copied so data
117// should be retained and unchanged after being passed into this function.
118func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error {
119	// Read trailer.
120	t, err := ReadMeasurementBlockTrailer(data)
121	if err != nil {
122		return err
123	}
124
125	// Save data section.
126	blk.data = data[t.Data.Offset:]
127	blk.data = blk.data[:t.Data.Size]
128
129	// Save hash index block.
130	blk.hashData = data[t.HashIndex.Offset:]
131	blk.hashData = blk.hashData[:t.HashIndex.Size]
132
133	// Initialise sketch data.
134	blk.sketchData = data[t.Sketch.Offset:][:t.Sketch.Size]
135	blk.tSketchData = data[t.TSketch.Offset:][:t.TSketch.Size]
136
137	return nil
138}
139
140// Iterator returns an iterator over all measurements.
141func (blk *MeasurementBlock) Iterator() MeasurementIterator {
142	return &blockMeasurementIterator{data: blk.data[MeasurementFillSize:]}
143}
144
145// SeriesIDIterator returns an iterator for all series ids in a measurement.
146func (blk *MeasurementBlock) SeriesIDIterator(name []byte) tsdb.SeriesIDIterator {
147	// Find measurement element.
148	e, ok := blk.Elem(name)
149	if !ok {
150		return &rawSeriesIDIterator{}
151	}
152	return &rawSeriesIDIterator{n: e.series.n, data: e.series.data}
153}
154
155// Sketches returns existence and tombstone measurement sketches.
156func (blk *MeasurementBlock) Sketches() (sketch, tSketch estimator.Sketch, err error) {
157	sketch = hll.NewDefaultPlus()
158	if err := sketch.UnmarshalBinary(blk.sketchData); err != nil {
159		return nil, nil, err
160	}
161
162	tSketch = hll.NewDefaultPlus()
163	if err := tSketch.UnmarshalBinary(blk.tSketchData); err != nil {
164		return nil, nil, err
165	}
166	return sketch, tSketch, nil
167}
168
169// blockMeasurementIterator iterates over a list measurements in a block.
170type blockMeasurementIterator struct {
171	elem MeasurementBlockElem
172	data []byte
173}
174
175// Next returns the next measurement. Returns nil when iterator is complete.
176func (itr *blockMeasurementIterator) Next() MeasurementElem {
177	// Return nil when we run out of data.
178	if len(itr.data) == 0 {
179		return nil
180	}
181
182	// Unmarshal the element at the current position.
183	itr.elem.UnmarshalBinary(itr.data)
184
185	// Move the data forward past the record.
186	itr.data = itr.data[itr.elem.size:]
187
188	return &itr.elem
189}
190
191// rawSeriesIterator iterates over a list of raw series data.
192type rawSeriesIDIterator struct {
193	prev uint64
194	n    uint64
195	data []byte
196}
197
198func (itr *rawSeriesIDIterator) Close() error { return nil }
199
200// Next returns the next decoded series.
201func (itr *rawSeriesIDIterator) Next() (tsdb.SeriesIDElem, error) {
202	if len(itr.data) == 0 {
203		return tsdb.SeriesIDElem{}, nil
204	}
205
206	delta, n, err := uvarint(itr.data)
207	if err != nil {
208		return tsdb.SeriesIDElem{}, err
209	}
210	itr.data = itr.data[n:]
211
212	seriesID := itr.prev + uint64(delta)
213	itr.prev = seriesID
214	return tsdb.SeriesIDElem{SeriesID: seriesID}, nil
215}
216
217func (itr *rawSeriesIDIterator) SeriesIDSet() *tsdb.SeriesIDSet {
218	ss := tsdb.NewSeriesIDSet()
219	for data, prev := itr.data, uint64(0); len(data) > 0; {
220		delta, n, err := uvarint(data)
221		if err != nil {
222			break
223		}
224		data = data[n:]
225
226		seriesID := prev + uint64(delta)
227		prev = seriesID
228		ss.AddNoLock(seriesID)
229	}
230	return ss
231}
232
233// MeasurementBlockTrailer represents meta data at the end of a MeasurementBlock.
234type MeasurementBlockTrailer struct {
235	Version int // Encoding version
236
237	// Offset & size of data section.
238	Data struct {
239		Offset int64
240		Size   int64
241	}
242
243	// Offset & size of hash map section.
244	HashIndex struct {
245		Offset int64
246		Size   int64
247	}
248
249	// Offset and size of cardinality sketch for measurements.
250	Sketch struct {
251		Offset int64
252		Size   int64
253	}
254
255	// Offset and size of cardinality sketch for tombstoned measurements.
256	TSketch struct {
257		Offset int64
258		Size   int64
259	}
260}
261
262// ReadMeasurementBlockTrailer returns the block trailer from data.
263func ReadMeasurementBlockTrailer(data []byte) (MeasurementBlockTrailer, error) {
264	var t MeasurementBlockTrailer
265
266	// Read version (which is located in the last two bytes of the trailer).
267	t.Version = int(binary.BigEndian.Uint16(data[len(data)-2:]))
268	if t.Version != MeasurementBlockVersion {
269		return t, ErrUnsupportedIndexFileVersion
270	}
271
272	// Slice trailer data.
273	buf := data[len(data)-MeasurementTrailerSize:]
274
275	// Read data section info.
276	t.Data.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
277	t.Data.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
278
279	// Read measurement block info.
280	t.HashIndex.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
281	t.HashIndex.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
282
283	// Read measurement sketch info.
284	t.Sketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
285	t.Sketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
286
287	// Read tombstone measurement sketch info.
288	t.TSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
289	t.TSketch.Size = int64(binary.BigEndian.Uint64(buf[0:8]))
290
291	return t, nil
292}
293
294// WriteTo writes the trailer to w.
295func (t *MeasurementBlockTrailer) WriteTo(w io.Writer) (n int64, err error) {
296	// Write data section info.
297	if err := writeUint64To(w, uint64(t.Data.Offset), &n); err != nil {
298		return n, err
299	} else if err := writeUint64To(w, uint64(t.Data.Size), &n); err != nil {
300		return n, err
301	}
302
303	// Write hash index section info.
304	if err := writeUint64To(w, uint64(t.HashIndex.Offset), &n); err != nil {
305		return n, err
306	} else if err := writeUint64To(w, uint64(t.HashIndex.Size), &n); err != nil {
307		return n, err
308	}
309
310	// Write measurement sketch info.
311	if err := writeUint64To(w, uint64(t.Sketch.Offset), &n); err != nil {
312		return n, err
313	} else if err := writeUint64To(w, uint64(t.Sketch.Size), &n); err != nil {
314		return n, err
315	}
316
317	// Write tombstone measurement sketch info.
318	if err := writeUint64To(w, uint64(t.TSketch.Offset), &n); err != nil {
319		return n, err
320	} else if err := writeUint64To(w, uint64(t.TSketch.Size), &n); err != nil {
321		return n, err
322	}
323
324	// Write measurement block version.
325	if err := writeUint16To(w, MeasurementBlockVersion, &n); err != nil {
326		return n, err
327	}
328
329	return n, nil
330}
331
332// MeasurementBlockElem represents an internal measurement element.
333type MeasurementBlockElem struct {
334	flag byte   // flag
335	name []byte // measurement name
336
337	tagBlock struct {
338		offset int64
339		size   int64
340	}
341
342	series struct {
343		n    uint64 // series count
344		data []byte // serialized series data
345	}
346
347	// size in bytes, set after unmarshaling.
348	size int
349}
350
351// Name returns the measurement name.
352func (e *MeasurementBlockElem) Name() []byte { return e.name }
353
354// Deleted returns true if the tombstone flag is set.
355func (e *MeasurementBlockElem) Deleted() bool {
356	return (e.flag & MeasurementTombstoneFlag) != 0
357}
358
359// TagBlockOffset returns the offset of the measurement's tag block.
360func (e *MeasurementBlockElem) TagBlockOffset() int64 { return e.tagBlock.offset }
361
362// TagBlockSize returns the size of the measurement's tag block.
363func (e *MeasurementBlockElem) TagBlockSize() int64 { return e.tagBlock.size }
364
365// SeriesData returns the raw series data.
366func (e *MeasurementBlockElem) SeriesData() []byte { return e.series.data }
367
368// SeriesN returns the number of series associated with the measurement.
369func (e *MeasurementBlockElem) SeriesN() uint64 { return e.series.n }
370
371// SeriesID returns series ID at an index.
372func (e *MeasurementBlockElem) SeriesID(i int) uint64 {
373	return binary.BigEndian.Uint64(e.series.data[i*SeriesIDSize:])
374}
375
376func (e *MeasurementBlockElem) HasSeries() bool { return e.series.n > 0 }
377
378// SeriesIDs returns a list of decoded series ids.
379//
380// NOTE: This should be used for testing and diagnostics purposes only.
381// It requires loading the entire list of series in-memory.
382func (e *MeasurementBlockElem) SeriesIDs() []uint64 {
383	a := make([]uint64, 0, e.series.n)
384	e.ForEachSeriesID(func(id uint64) error {
385		a = append(a, id)
386		return nil
387	})
388	return a
389}
390
391func (e *MeasurementBlockElem) ForEachSeriesID(fn func(uint64) error) error {
392	var prev uint64
393	for data := e.series.data; len(data) > 0; {
394		delta, n, err := uvarint(data)
395		if err != nil {
396			return err
397		}
398		data = data[n:]
399
400		seriesID := prev + uint64(delta)
401		if err = fn(seriesID); err != nil {
402			return err
403		}
404		prev = seriesID
405	}
406	return nil
407}
408
409// Size returns the size of the element.
410func (e *MeasurementBlockElem) Size() int { return e.size }
411
412// UnmarshalBinary unmarshals data into e.
413func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error {
414	start := len(data)
415
416	// Parse flag data.
417	e.flag, data = data[0], data[1:]
418
419	// Parse tag block offset.
420	e.tagBlock.offset, data = int64(binary.BigEndian.Uint64(data)), data[8:]
421	e.tagBlock.size, data = int64(binary.BigEndian.Uint64(data)), data[8:]
422
423	// Parse name.
424	sz, n, err := uvarint(data)
425	if err != nil {
426		return err
427	}
428	e.name, data = data[n:n+int(sz)], data[n+int(sz):]
429
430	// Parse series data.
431	v, n, err := uvarint(data)
432	if err != nil {
433		return err
434	}
435	e.series.n, data = uint64(v), data[n:]
436	sz, n, err = uvarint(data)
437	if err != nil {
438		return err
439	}
440	data = data[n:]
441	e.series.data, data = data[:sz], data[sz:]
442
443	// Save length of elem.
444	e.size = start - len(data)
445
446	return nil
447}
448
449// MeasurementBlockWriter writes a measurement block.
450type MeasurementBlockWriter struct {
451	buf bytes.Buffer
452	mms map[string]measurement
453
454	// Measurement sketch and tombstoned measurement sketch.
455	sketch, tSketch estimator.Sketch
456}
457
458// NewMeasurementBlockWriter returns a new MeasurementBlockWriter.
459func NewMeasurementBlockWriter() *MeasurementBlockWriter {
460	return &MeasurementBlockWriter{
461		mms:     make(map[string]measurement),
462		sketch:  hll.NewDefaultPlus(),
463		tSketch: hll.NewDefaultPlus(),
464	}
465}
466
467// Add adds a measurement with series and tag set offset/size.
468func (mw *MeasurementBlockWriter) Add(name []byte, deleted bool, offset, size int64, seriesIDs []uint64) {
469	mm := mw.mms[string(name)]
470	mm.deleted = deleted
471	mm.tagBlock.offset = offset
472	mm.tagBlock.size = size
473	mm.seriesIDs = seriesIDs
474	mw.mms[string(name)] = mm
475
476	if deleted {
477		mw.tSketch.Add(name)
478	} else {
479		mw.sketch.Add(name)
480	}
481}
482
483// WriteTo encodes the measurements to w.
484func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) {
485	var t MeasurementBlockTrailer
486
487	// The sketches must be set before calling WriteTo.
488	if mw.sketch == nil {
489		return 0, errors.New("measurement sketch not set")
490	} else if mw.tSketch == nil {
491		return 0, errors.New("measurement tombstone sketch not set")
492	}
493
494	// Sort names.
495	names := make([]string, 0, len(mw.mms))
496	for name := range mw.mms {
497		names = append(names, name)
498	}
499	sort.Strings(names)
500
501	// Begin data section.
502	t.Data.Offset = n
503
504	// Write padding byte so no offsets are zero.
505	if err := writeUint8To(w, 0, &n); err != nil {
506		return n, err
507	}
508
509	// Encode key list.
510	for _, name := range names {
511		// Retrieve measurement and save offset.
512		mm := mw.mms[name]
513		mm.offset = n
514		mw.mms[name] = mm
515
516		// Write measurement
517		if err := mw.writeMeasurementTo(w, []byte(name), &mm, &n); err != nil {
518			return n, err
519		}
520	}
521	t.Data.Size = n - t.Data.Offset
522
523	// Build key hash map
524	m := rhh.NewHashMap(rhh.Options{
525		Capacity:   int64(len(names)),
526		LoadFactor: LoadFactor,
527	})
528	for name := range mw.mms {
529		mm := mw.mms[name]
530		m.Put([]byte(name), &mm)
531	}
532
533	t.HashIndex.Offset = n
534
535	// Encode hash map length.
536	if err := writeUint64To(w, uint64(m.Cap()), &n); err != nil {
537		return n, err
538	}
539
540	// Encode hash map offset entries.
541	for i := int64(0); i < m.Cap(); i++ {
542		_, v := m.Elem(i)
543
544		var offset int64
545		if mm, ok := v.(*measurement); ok {
546			offset = mm.offset
547		}
548
549		if err := writeUint64To(w, uint64(offset), &n); err != nil {
550			return n, err
551		}
552	}
553	t.HashIndex.Size = n - t.HashIndex.Offset
554
555	// Write the sketches out.
556	t.Sketch.Offset = n
557	if err := writeSketchTo(w, mw.sketch, &n); err != nil {
558		return n, err
559	}
560	t.Sketch.Size = n - t.Sketch.Offset
561
562	t.TSketch.Offset = n
563	if err := writeSketchTo(w, mw.tSketch, &n); err != nil {
564		return n, err
565	}
566	t.TSketch.Size = n - t.TSketch.Offset
567
568	// Write trailer.
569	nn, err := t.WriteTo(w)
570	n += nn
571	return n, err
572}
573
574// writeMeasurementTo encodes a single measurement entry into w.
575func (mw *MeasurementBlockWriter) writeMeasurementTo(w io.Writer, name []byte, mm *measurement, n *int64) error {
576	// Write flag & tag block offset.
577	if err := writeUint8To(w, mm.flag(), n); err != nil {
578		return err
579	}
580	if err := writeUint64To(w, uint64(mm.tagBlock.offset), n); err != nil {
581		return err
582	} else if err := writeUint64To(w, uint64(mm.tagBlock.size), n); err != nil {
583		return err
584	}
585
586	// Write measurement name.
587	if err := writeUvarintTo(w, uint64(len(name)), n); err != nil {
588		return err
589	}
590	if err := writeTo(w, name, n); err != nil {
591		return err
592	}
593
594	// Write series data to buffer.
595	mw.buf.Reset()
596	var prev uint64
597	for _, seriesID := range mm.seriesIDs {
598		delta := seriesID - prev
599
600		var buf [binary.MaxVarintLen32]byte
601		i := binary.PutUvarint(buf[:], uint64(delta))
602		if _, err := mw.buf.Write(buf[:i]); err != nil {
603			return err
604		}
605
606		prev = seriesID
607	}
608
609	// Write series count.
610	if err := writeUvarintTo(w, uint64(len(mm.seriesIDs)), n); err != nil {
611		return err
612	}
613
614	// Write data size & buffer.
615	if err := writeUvarintTo(w, uint64(mw.buf.Len()), n); err != nil {
616		return err
617	}
618	nn, err := mw.buf.WriteTo(w)
619	*n += nn
620	return err
621}
622
623// writeSketchTo writes an estimator.Sketch into w, updating the number of bytes
624// written via n.
625func writeSketchTo(w io.Writer, s estimator.Sketch, n *int64) error {
626	data, err := s.MarshalBinary()
627	if err != nil {
628		return err
629	}
630
631	nn, err := w.Write(data)
632	*n += int64(nn)
633	return err
634}
635
636type measurement struct {
637	deleted  bool
638	tagBlock struct {
639		offset int64
640		size   int64
641	}
642	seriesIDs []uint64
643	offset    int64
644}
645
646func (mm measurement) flag() byte {
647	var flag byte
648	if mm.deleted {
649		flag |= MeasurementTombstoneFlag
650	}
651	return flag
652}
653