1package tsi1
2
3import (
4	"bytes"
5	"encoding/binary"
6	"errors"
7	"io"
8	"sort"
9	"unsafe"
10
11	"github.com/influxdata/influxdb/pkg/estimator"
12	"github.com/influxdata/influxdb/pkg/estimator/hll"
13	"github.com/influxdata/influxdb/pkg/rhh"
14	"github.com/influxdata/influxdb/tsdb"
15)
16
17// MeasurementBlockVersion is the version of the measurement block.
18const MeasurementBlockVersion = 1
19
20// Measurement flag constants.
21const (
22	MeasurementTombstoneFlag   = 0x01
23	MeasurementSeriesIDSetFlag = 0x02
24)
25
26// Measurement field size constants.
27const (
28	// 1 byte offset for the block to ensure non-zero offsets.
29	MeasurementFillSize = 1
30
31	// Measurement trailer fields
32	MeasurementTrailerSize = 0 +
33		2 + // version
34		8 + 8 + // data offset/size
35		8 + 8 + // hash index offset/size
36		8 + 8 + // measurement sketch offset/size
37		8 + 8 // tombstone measurement sketch offset/size
38
39	// Measurement key block fields.
40	MeasurementNSize      = 8
41	MeasurementOffsetSize = 8
42
43	SeriesIDSize = 8
44)
45
46// Measurement errors.
47var (
48	ErrUnsupportedMeasurementBlockVersion = errors.New("unsupported measurement block version")
49	ErrMeasurementBlockSizeMismatch       = errors.New("measurement block size mismatch")
50)
51
52// MeasurementBlock represents a collection of all measurements in an index.
53type MeasurementBlock struct {
54	data     []byte
55	hashData []byte
56
57	// Measurement sketch and tombstone sketch for cardinality estimation.
58	sketchData, tSketchData []byte
59
60	version int // block version
61}
62
63// bytes estimates the memory footprint of this MeasurementBlock, in bytes.
64func (blk *MeasurementBlock) bytes() int {
65	var b int
66	// Do not count contents of blk.data or blk.hashData because they reference into an external []byte
67	b += int(unsafe.Sizeof(*blk))
68	return b
69}
70
71// Version returns the encoding version parsed from the data.
72// Only valid after UnmarshalBinary() has been successfully invoked.
73func (blk *MeasurementBlock) Version() int { return blk.version }
74
75// Elem returns an element for a measurement.
76func (blk *MeasurementBlock) Elem(name []byte) (e MeasurementBlockElem, ok bool) {
77	n := int64(binary.BigEndian.Uint64(blk.hashData[:MeasurementNSize]))
78	hash := rhh.HashKey(name)
79	pos := hash % n
80
81	// Track current distance
82	var d int64
83	for {
84		// Find offset of measurement.
85		offset := binary.BigEndian.Uint64(blk.hashData[MeasurementNSize+(pos*MeasurementOffsetSize):])
86		if offset == 0 {
87			return MeasurementBlockElem{}, false
88		}
89
90		// Evaluate name if offset is not empty.
91		if offset > 0 {
92			// Parse into element.
93			var e MeasurementBlockElem
94			e.UnmarshalBinary(blk.data[offset:])
95
96			// Return if name match.
97			if bytes.Equal(e.name, name) {
98				return e, true
99			}
100
101			// Check if we've exceeded the probe distance.
102			if d > rhh.Dist(rhh.HashKey(e.name), pos, n) {
103				return MeasurementBlockElem{}, false
104			}
105		}
106
107		// Move position forward.
108		pos = (pos + 1) % n
109		d++
110
111		if d > n {
112			return MeasurementBlockElem{}, false
113		}
114	}
115}
116
117// UnmarshalBinary unpacks data into the block. Block is not copied so data
118// should be retained and unchanged after being passed into this function.
119func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error {
120	// Read trailer.
121	t, err := ReadMeasurementBlockTrailer(data)
122	if err != nil {
123		return err
124	}
125
126	// Save data section.
127	blk.data = data[t.Data.Offset:]
128	blk.data = blk.data[:t.Data.Size]
129
130	// Save hash index block.
131	blk.hashData = data[t.HashIndex.Offset:]
132	blk.hashData = blk.hashData[:t.HashIndex.Size]
133
134	// Initialise sketch data.
135	blk.sketchData = data[t.Sketch.Offset:][:t.Sketch.Size]
136	blk.tSketchData = data[t.TSketch.Offset:][:t.TSketch.Size]
137
138	return nil
139}
140
141// Iterator returns an iterator over all measurements.
142func (blk *MeasurementBlock) Iterator() MeasurementIterator {
143	return &blockMeasurementIterator{data: blk.data[MeasurementFillSize:]}
144}
145
146// SeriesIDIterator returns an iterator for all series ids in a measurement.
147func (blk *MeasurementBlock) SeriesIDIterator(name []byte) tsdb.SeriesIDIterator {
148	// Find measurement element.
149	e, ok := blk.Elem(name)
150	if !ok {
151		return &rawSeriesIDIterator{}
152	}
153	if e.seriesIDSet != nil {
154		return tsdb.NewSeriesIDSetIterator(e.seriesIDSet)
155	}
156	return &rawSeriesIDIterator{n: e.series.n, data: e.series.data}
157}
158
159// Sketches returns existence and tombstone measurement sketches.
160func (blk *MeasurementBlock) Sketches() (sketch, tSketch estimator.Sketch, err error) {
161	sketch = hll.NewDefaultPlus()
162	if err := sketch.UnmarshalBinary(blk.sketchData); err != nil {
163		return nil, nil, err
164	}
165
166	tSketch = hll.NewDefaultPlus()
167	if err := tSketch.UnmarshalBinary(blk.tSketchData); err != nil {
168		return nil, nil, err
169	}
170	return sketch, tSketch, nil
171}
172
173// blockMeasurementIterator iterates over a list measurements in a block.
174type blockMeasurementIterator struct {
175	elem MeasurementBlockElem
176	data []byte
177}
178
179// Next returns the next measurement. Returns nil when iterator is complete.
180func (itr *blockMeasurementIterator) Next() MeasurementElem {
181	// Return nil when we run out of data.
182	if len(itr.data) == 0 {
183		return nil
184	}
185
186	// Unmarshal the element at the current position.
187	itr.elem.UnmarshalBinary(itr.data)
188
189	// Move the data forward past the record.
190	itr.data = itr.data[itr.elem.size:]
191
192	return &itr.elem
193}
194
195// rawSeriesIterator iterates over a list of raw series data.
196type rawSeriesIDIterator struct {
197	prev uint64
198	n    uint64
199	data []byte
200}
201
202func (itr *rawSeriesIDIterator) Close() error { return nil }
203
204// Next returns the next decoded series.
205func (itr *rawSeriesIDIterator) Next() (tsdb.SeriesIDElem, error) {
206	if len(itr.data) == 0 {
207		return tsdb.SeriesIDElem{}, nil
208	}
209
210	delta, n, err := uvarint(itr.data)
211	if err != nil {
212		return tsdb.SeriesIDElem{}, err
213	}
214	itr.data = itr.data[n:]
215
216	seriesID := itr.prev + uint64(delta)
217	itr.prev = seriesID
218	return tsdb.SeriesIDElem{SeriesID: seriesID}, nil
219}
220
221func (itr *rawSeriesIDIterator) SeriesIDSet() *tsdb.SeriesIDSet {
222	ss := tsdb.NewSeriesIDSet()
223	for data, prev := itr.data, uint64(0); len(data) > 0; {
224		delta, n, err := uvarint(data)
225		if err != nil {
226			break
227		}
228		data = data[n:]
229
230		seriesID := prev + uint64(delta)
231		prev = seriesID
232		ss.AddNoLock(seriesID)
233	}
234	return ss
235}
236
237// MeasurementBlockTrailer represents meta data at the end of a MeasurementBlock.
238type MeasurementBlockTrailer struct {
239	Version int // Encoding version
240
241	// Offset & size of data section.
242	Data struct {
243		Offset int64
244		Size   int64
245	}
246
247	// Offset & size of hash map section.
248	HashIndex struct {
249		Offset int64
250		Size   int64
251	}
252
253	// Offset and size of cardinality sketch for measurements.
254	Sketch struct {
255		Offset int64
256		Size   int64
257	}
258
259	// Offset and size of cardinality sketch for tombstoned measurements.
260	TSketch struct {
261		Offset int64
262		Size   int64
263	}
264}
265
266// ReadMeasurementBlockTrailer returns the block trailer from data.
267func ReadMeasurementBlockTrailer(data []byte) (MeasurementBlockTrailer, error) {
268	var t MeasurementBlockTrailer
269
270	// Read version (which is located in the last two bytes of the trailer).
271	t.Version = int(binary.BigEndian.Uint16(data[len(data)-2:]))
272	if t.Version != MeasurementBlockVersion {
273		return t, ErrUnsupportedIndexFileVersion
274	}
275
276	// Slice trailer data.
277	buf := data[len(data)-MeasurementTrailerSize:]
278
279	// Read data section info.
280	t.Data.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
281	t.Data.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
282
283	// Read measurement block info.
284	t.HashIndex.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
285	t.HashIndex.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
286
287	// Read measurement sketch info.
288	t.Sketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
289	t.Sketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
290
291	// Read tombstone measurement sketch info.
292	t.TSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
293	t.TSketch.Size = int64(binary.BigEndian.Uint64(buf[0:8]))
294
295	return t, nil
296}
297
298// WriteTo writes the trailer to w.
299func (t *MeasurementBlockTrailer) WriteTo(w io.Writer) (n int64, err error) {
300	// Write data section info.
301	if err := writeUint64To(w, uint64(t.Data.Offset), &n); err != nil {
302		return n, err
303	} else if err := writeUint64To(w, uint64(t.Data.Size), &n); err != nil {
304		return n, err
305	}
306
307	// Write hash index section info.
308	if err := writeUint64To(w, uint64(t.HashIndex.Offset), &n); err != nil {
309		return n, err
310	} else if err := writeUint64To(w, uint64(t.HashIndex.Size), &n); err != nil {
311		return n, err
312	}
313
314	// Write measurement sketch info.
315	if err := writeUint64To(w, uint64(t.Sketch.Offset), &n); err != nil {
316		return n, err
317	} else if err := writeUint64To(w, uint64(t.Sketch.Size), &n); err != nil {
318		return n, err
319	}
320
321	// Write tombstone measurement sketch info.
322	if err := writeUint64To(w, uint64(t.TSketch.Offset), &n); err != nil {
323		return n, err
324	} else if err := writeUint64To(w, uint64(t.TSketch.Size), &n); err != nil {
325		return n, err
326	}
327
328	// Write measurement block version.
329	if err := writeUint16To(w, MeasurementBlockVersion, &n); err != nil {
330		return n, err
331	}
332
333	return n, nil
334}
335
336// MeasurementBlockElem represents an internal measurement element.
337type MeasurementBlockElem struct {
338	flag byte   // flag
339	name []byte // measurement name
340
341	tagBlock struct {
342		offset int64
343		size   int64
344	}
345
346	series struct {
347		n    uint64 // series count
348		data []byte // serialized series data
349	}
350
351	seriesIDSet *tsdb.SeriesIDSet
352
353	// size in bytes, set after unmarshaling.
354	size int
355}
356
357// Name returns the measurement name.
358func (e *MeasurementBlockElem) Name() []byte { return e.name }
359
360// Deleted returns true if the tombstone flag is set.
361func (e *MeasurementBlockElem) Deleted() bool {
362	return (e.flag & MeasurementTombstoneFlag) != 0
363}
364
365// TagBlockOffset returns the offset of the measurement's tag block.
366func (e *MeasurementBlockElem) TagBlockOffset() int64 { return e.tagBlock.offset }
367
368// TagBlockSize returns the size of the measurement's tag block.
369func (e *MeasurementBlockElem) TagBlockSize() int64 { return e.tagBlock.size }
370
371// SeriesData returns the raw series data.
372func (e *MeasurementBlockElem) SeriesData() []byte { return e.series.data }
373
374// SeriesN returns the number of series associated with the measurement.
375func (e *MeasurementBlockElem) SeriesN() uint64 { return e.series.n }
376
377// SeriesID returns series ID at an index.
378func (e *MeasurementBlockElem) SeriesID(i int) uint64 {
379	return binary.BigEndian.Uint64(e.series.data[i*SeriesIDSize:])
380}
381
382func (e *MeasurementBlockElem) HasSeries() bool { return e.series.n > 0 }
383
384// SeriesIDs returns a list of decoded series ids.
385//
386// NOTE: This should be used for testing and diagnostics purposes only.
387// It requires loading the entire list of series in-memory.
388func (e *MeasurementBlockElem) SeriesIDs() []uint64 {
389	a := make([]uint64, 0, e.series.n)
390	e.ForEachSeriesID(func(id uint64) error {
391		a = append(a, id)
392		return nil
393	})
394	return a
395}
396
397func (e *MeasurementBlockElem) ForEachSeriesID(fn func(uint64) error) error {
398	// Read from roaring, if available.
399	if e.seriesIDSet != nil {
400		itr := e.seriesIDSet.Iterator()
401		for itr.HasNext() {
402			if err := fn(uint64(itr.Next())); err != nil {
403				return err
404			}
405		}
406	}
407
408	// Read from uvarint encoded data, if available.
409	var prev uint64
410	for data := e.series.data; len(data) > 0; {
411		delta, n, err := uvarint(data)
412		if err != nil {
413			return err
414		}
415		data = data[n:]
416
417		seriesID := prev + uint64(delta)
418		if err = fn(seriesID); err != nil {
419			return err
420		}
421		prev = seriesID
422	}
423	return nil
424}
425
426// Size returns the size of the element.
427func (e *MeasurementBlockElem) Size() int { return e.size }
428
429// UnmarshalBinary unmarshals data into e.
430func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error {
431	start := len(data)
432
433	// Parse flag data.
434	e.flag, data = data[0], data[1:]
435
436	// Parse tag block offset.
437	e.tagBlock.offset, data = int64(binary.BigEndian.Uint64(data)), data[8:]
438	e.tagBlock.size, data = int64(binary.BigEndian.Uint64(data)), data[8:]
439
440	// Parse name.
441	sz, n, err := uvarint(data)
442	if err != nil {
443		return err
444	}
445	e.name, data = data[n:n+int(sz)], data[n+int(sz):]
446
447	// Parse series count.
448	v, n, err := uvarint(data)
449	if err != nil {
450		return err
451	}
452	e.series.n, data = uint64(v), data[n:]
453
454	// Parse series data size.
455	sz, n, err = uvarint(data)
456	if err != nil {
457		return err
458	}
459	data = data[n:]
460
461	// Parse series data (original uvarint encoded or roaring bitmap).
462	if e.flag&MeasurementSeriesIDSetFlag == 0 {
463		e.series.data, data = data[:sz], data[sz:]
464	} else {
465		// data = memalign(data)
466		e.seriesIDSet = tsdb.NewSeriesIDSet()
467		if err = e.seriesIDSet.UnmarshalBinaryUnsafe(data[:sz]); err != nil {
468			return err
469		}
470		data = data[sz:]
471	}
472
473	// Save length of elem.
474	e.size = start - len(data)
475
476	return nil
477}
478
479// MeasurementBlockWriter writes a measurement block.
480type MeasurementBlockWriter struct {
481	buf bytes.Buffer
482	mms map[string]measurement
483
484	// Measurement sketch and tombstoned measurement sketch.
485	sketch, tSketch estimator.Sketch
486}
487
488// NewMeasurementBlockWriter returns a new MeasurementBlockWriter.
489func NewMeasurementBlockWriter() *MeasurementBlockWriter {
490	return &MeasurementBlockWriter{
491		mms:     make(map[string]measurement),
492		sketch:  hll.NewDefaultPlus(),
493		tSketch: hll.NewDefaultPlus(),
494	}
495}
496
497// Add adds a measurement with series and tag set offset/size.
498func (mw *MeasurementBlockWriter) Add(name []byte, deleted bool, offset, size int64, seriesIDs []uint64) {
499	mm := mw.mms[string(name)]
500	mm.deleted = deleted
501	mm.tagBlock.offset = offset
502	mm.tagBlock.size = size
503
504	if mm.seriesIDSet == nil {
505		mm.seriesIDSet = tsdb.NewSeriesIDSet()
506	}
507	for _, seriesID := range seriesIDs {
508		mm.seriesIDSet.AddNoLock(seriesID)
509	}
510
511	mw.mms[string(name)] = mm
512
513	if deleted {
514		mw.tSketch.Add(name)
515	} else {
516		mw.sketch.Add(name)
517	}
518}
519
520// WriteTo encodes the measurements to w.
521func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) {
522	var t MeasurementBlockTrailer
523
524	// The sketches must be set before calling WriteTo.
525	if mw.sketch == nil {
526		return 0, errors.New("measurement sketch not set")
527	} else if mw.tSketch == nil {
528		return 0, errors.New("measurement tombstone sketch not set")
529	}
530
531	// Sort names.
532	names := make([]string, 0, len(mw.mms))
533	for name := range mw.mms {
534		names = append(names, name)
535	}
536	sort.Strings(names)
537
538	// Begin data section.
539	t.Data.Offset = n
540
541	// Write padding byte so no offsets are zero.
542	if err := writeUint8To(w, 0, &n); err != nil {
543		return n, err
544	}
545
546	// Encode key list.
547	for _, name := range names {
548		// Retrieve measurement and save offset.
549		mm := mw.mms[name]
550		mm.offset = n
551		mw.mms[name] = mm
552
553		// Write measurement
554		if err := mw.writeMeasurementTo(w, []byte(name), &mm, &n); err != nil {
555			return n, err
556		}
557	}
558	t.Data.Size = n - t.Data.Offset
559
560	// Build key hash map
561	m := rhh.NewHashMap(rhh.Options{
562		Capacity:   int64(len(names)),
563		LoadFactor: LoadFactor,
564	})
565	for name := range mw.mms {
566		mm := mw.mms[name]
567		m.Put([]byte(name), &mm)
568	}
569
570	t.HashIndex.Offset = n
571
572	// Encode hash map length.
573	if err := writeUint64To(w, uint64(m.Cap()), &n); err != nil {
574		return n, err
575	}
576
577	// Encode hash map offset entries.
578	for i := int64(0); i < m.Cap(); i++ {
579		_, v := m.Elem(i)
580
581		var offset int64
582		if mm, ok := v.(*measurement); ok {
583			offset = mm.offset
584		}
585
586		if err := writeUint64To(w, uint64(offset), &n); err != nil {
587			return n, err
588		}
589	}
590	t.HashIndex.Size = n - t.HashIndex.Offset
591
592	// Write the sketches out.
593	t.Sketch.Offset = n
594	if err := writeSketchTo(w, mw.sketch, &n); err != nil {
595		return n, err
596	}
597	t.Sketch.Size = n - t.Sketch.Offset
598
599	t.TSketch.Offset = n
600	if err := writeSketchTo(w, mw.tSketch, &n); err != nil {
601		return n, err
602	}
603	t.TSketch.Size = n - t.TSketch.Offset
604
605	// Write trailer.
606	nn, err := t.WriteTo(w)
607	n += nn
608	return n, err
609}
610
611// writeMeasurementTo encodes a single measurement entry into w.
612func (mw *MeasurementBlockWriter) writeMeasurementTo(w io.Writer, name []byte, mm *measurement, n *int64) error {
613	// Write flag & tag block offset.
614	if err := writeUint8To(w, mm.flag(), n); err != nil {
615		return err
616	}
617	if err := writeUint64To(w, uint64(mm.tagBlock.offset), n); err != nil {
618		return err
619	} else if err := writeUint64To(w, uint64(mm.tagBlock.size), n); err != nil {
620		return err
621	}
622
623	// Write measurement name.
624	if err := writeUvarintTo(w, uint64(len(name)), n); err != nil {
625		return err
626	}
627	if err := writeTo(w, name, n); err != nil {
628		return err
629	}
630
631	// Write series data to buffer.
632	mw.buf.Reset()
633	if _, err := mm.seriesIDSet.WriteTo(&mw.buf); err != nil {
634		return err
635	}
636
637	// Write series count.
638	if err := writeUvarintTo(w, mm.seriesIDSet.Cardinality(), n); err != nil {
639		return err
640	}
641
642	// Write data size & buffer.
643	if err := writeUvarintTo(w, uint64(mw.buf.Len()), n); err != nil {
644		return err
645	}
646
647	// Word align bitmap data.
648	// if offset := (*n) % 8; offset != 0 {
649	// 	if err := writeTo(w, make([]byte, 8-offset), n); err != nil {
650	// 		return err
651	// 	}
652	// }
653
654	nn, err := mw.buf.WriteTo(w)
655	*n += nn
656	return err
657}
658
659// writeSketchTo writes an estimator.Sketch into w, updating the number of bytes
660// written via n.
661func writeSketchTo(w io.Writer, s estimator.Sketch, n *int64) error {
662	data, err := s.MarshalBinary()
663	if err != nil {
664		return err
665	}
666
667	nn, err := w.Write(data)
668	*n += int64(nn)
669	return err
670}
671
672type measurement struct {
673	deleted  bool
674	tagBlock struct {
675		offset int64
676		size   int64
677	}
678	seriesIDSet *tsdb.SeriesIDSet
679	offset      int64
680}
681
682func (mm measurement) flag() byte {
683	flag := byte(MeasurementSeriesIDSetFlag)
684	if mm.deleted {
685		flag |= MeasurementTombstoneFlag
686	}
687	return flag
688}
689