1package tsi1
2
3import (
4	"bytes"
5	"encoding/binary"
6	"errors"
7	"fmt"
8	"io"
9
10	"github.com/influxdata/influxdb/pkg/rhh"
11	"github.com/influxdata/influxdb/tsdb"
12)
13
14// TagBlockVersion is the version of the tag block.
15const TagBlockVersion = 1
16
17// Tag key flag constants.
18const (
19	TagKeyTombstoneFlag = 0x01
20)
21
22// Tag value flag constants.
23const (
24	TagValueTombstoneFlag   = 0x01
25	TagValueSeriesIDSetFlag = 0x02
26)
27
28// TagBlock variable size constants.
29const (
30	// TagBlock key block fields.
31	TagKeyNSize      = 8
32	TagKeyOffsetSize = 8
33
34	// TagBlock value block fields.
35	TagValueNSize      = 8
36	TagValueOffsetSize = 8
37)
38
39// TagBlock errors.
40var (
41	ErrUnsupportedTagBlockVersion = errors.New("unsupported tag block version")
42	ErrTagBlockSizeMismatch       = errors.New("tag block size mismatch")
43)
44
45// TagBlock represents tag key/value block for a single measurement.
46type TagBlock struct {
47	data []byte
48
49	valueData []byte
50	keyData   []byte
51	hashData  []byte
52
53	version int // tag block version
54}
55
56// Version returns the encoding version parsed from the data.
57// Only valid after UnmarshalBinary() has been successfully invoked.
58func (blk *TagBlock) Version() int { return blk.version }
59
60// UnmarshalBinary unpacks data into the tag block. Tag block is not copied so data
61// should be retained and unchanged after being passed into this function.
62func (blk *TagBlock) UnmarshalBinary(data []byte) error {
63	// Read trailer.
64	t, err := ReadTagBlockTrailer(data)
65	if err != nil {
66		return err
67	}
68
69	// Verify data size is correct.
70	if int64(len(data)) != t.Size {
71		return ErrTagBlockSizeMismatch
72	}
73
74	// Save data section.
75	blk.valueData = data[t.ValueData.Offset:]
76	blk.valueData = blk.valueData[:t.ValueData.Size]
77
78	// Save key data section.
79	blk.keyData = data[t.KeyData.Offset:]
80	blk.keyData = blk.keyData[:t.KeyData.Size]
81
82	// Save hash index block.
83	blk.hashData = data[t.HashIndex.Offset:]
84	blk.hashData = blk.hashData[:t.HashIndex.Size]
85
86	// Save entire block.
87	blk.data = data
88
89	return nil
90}
91
92// TagKeyElem returns an element for a tag key.
93// Returns an element with a nil key if not found.
94func (blk *TagBlock) TagKeyElem(key []byte) TagKeyElem {
95	var elem TagBlockKeyElem
96	if !blk.DecodeTagKeyElem(key, &elem) {
97		return nil
98	}
99	return &elem
100}
101
102func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool {
103	keyN := int64(binary.BigEndian.Uint64(blk.hashData[:TagKeyNSize]))
104	hash := rhh.HashKey(key)
105	pos := hash % keyN
106
107	// Track current distance
108	var d int64
109	for {
110		// Find offset of tag key.
111		offset := binary.BigEndian.Uint64(blk.hashData[TagKeyNSize+(pos*TagKeyOffsetSize):])
112		if offset == 0 {
113			return false
114		}
115
116		// Parse into element.
117		elem.unmarshal(blk.data[offset:], blk.data)
118
119		// Return if keys match.
120		if bytes.Equal(elem.key, key) {
121			return true
122		}
123
124		// Check if we've exceeded the probe distance.
125		if d > rhh.Dist(rhh.HashKey(elem.key), pos, keyN) {
126			return false
127		}
128
129		// Move position forward.
130		pos = (pos + 1) % keyN
131		d++
132
133		if d > keyN {
134			return false
135		}
136	}
137}
138
139// TagValueElem returns an element for a tag value.
140func (blk *TagBlock) TagValueElem(key, value []byte) TagValueElem {
141	var valueElem TagBlockValueElem
142	if !blk.DecodeTagValueElem(key, value, &valueElem) {
143		return nil
144	}
145	return &valueElem
146}
147
148// DecodeTagValueElem returns an element for a tag value.
149func (blk *TagBlock) DecodeTagValueElem(key, value []byte, valueElem *TagBlockValueElem) bool {
150	// Find key element, exit if not found.
151	var keyElem TagBlockKeyElem
152	if !blk.DecodeTagKeyElem(key, &keyElem) {
153		return false
154	}
155
156	// Slice hash index data.
157	hashData := keyElem.hashIndex.buf
158
159	valueN := int64(binary.BigEndian.Uint64(hashData[:TagValueNSize]))
160	hash := rhh.HashKey(value)
161	pos := hash % valueN
162
163	// Track current distance
164	var d int64
165	for {
166		// Find offset of tag value.
167		offset := binary.BigEndian.Uint64(hashData[TagValueNSize+(pos*TagValueOffsetSize):])
168		if offset == 0 {
169			return false
170		}
171
172		// Parse into element.
173		valueElem.unmarshal(blk.data[offset:])
174
175		// Return if values match.
176		if bytes.Equal(valueElem.value, value) {
177			return true
178		}
179
180		// Check if we've exceeded the probe distance.
181		max := rhh.Dist(rhh.HashKey(valueElem.value), pos, valueN)
182		if d > max {
183			return false
184		}
185
186		// Move position forward.
187		pos = (pos + 1) % valueN
188		d++
189
190		if d > valueN {
191			return false
192		}
193	}
194}
195
196// TagKeyIterator returns an iterator over all the keys in the block.
197func (blk *TagBlock) TagKeyIterator() TagKeyIterator {
198	return &tagBlockKeyIterator{
199		blk:     blk,
200		keyData: blk.keyData,
201	}
202}
203
204// tagBlockKeyIterator represents an iterator over all keys in a TagBlock.
205type tagBlockKeyIterator struct {
206	blk     *TagBlock
207	keyData []byte
208	e       TagBlockKeyElem
209}
210
211// Next returns the next element in the iterator.
212func (itr *tagBlockKeyIterator) Next() TagKeyElem {
213	// Exit when there is no data left.
214	if len(itr.keyData) == 0 {
215		return nil
216	}
217
218	// Unmarshal next element & move data forward.
219	itr.e.unmarshal(itr.keyData, itr.blk.data)
220	itr.keyData = itr.keyData[itr.e.size:]
221
222	assert(len(itr.e.Key()) > 0, "invalid zero-length tag key")
223	return &itr.e
224}
225
226// tagBlockValueIterator represents an iterator over all values for a tag key.
227type tagBlockValueIterator struct {
228	data []byte
229	e    TagBlockValueElem
230}
231
232// Next returns the next element in the iterator.
233func (itr *tagBlockValueIterator) Next() TagValueElem {
234	// Exit when there is no data left.
235	if len(itr.data) == 0 {
236		return nil
237	}
238
239	// Unmarshal next element & move data forward.
240	itr.e.unmarshal(itr.data)
241	itr.data = itr.data[itr.e.size:]
242
243	assert(len(itr.e.Value()) > 0, "invalid zero-length tag value")
244	return &itr.e
245}
246
247// TagBlockKeyElem represents a tag key element in a TagBlock.
248type TagBlockKeyElem struct {
249	flag byte
250	key  []byte
251
252	// Value data
253	data struct {
254		offset uint64
255		size   uint64
256		buf    []byte
257	}
258
259	// Value hash index data
260	hashIndex struct {
261		offset uint64
262		size   uint64
263		buf    []byte
264	}
265
266	size int
267}
268
269// Deleted returns true if the key has been tombstoned.
270func (e *TagBlockKeyElem) Deleted() bool { return (e.flag & TagKeyTombstoneFlag) != 0 }
271
272// Key returns the key name of the element.
273func (e *TagBlockKeyElem) Key() []byte { return e.key }
274
275// TagValueIterator returns an iterator over the key's values.
276func (e *TagBlockKeyElem) TagValueIterator() TagValueIterator {
277	return &tagBlockValueIterator{data: e.data.buf}
278}
279
280// unmarshal unmarshals buf into e.
281// The data argument represents the entire block data.
282func (e *TagBlockKeyElem) unmarshal(buf, data []byte) {
283	start := len(buf)
284
285	// Parse flag data.
286	e.flag, buf = buf[0], buf[1:]
287
288	// Parse data offset/size.
289	e.data.offset, buf = binary.BigEndian.Uint64(buf), buf[8:]
290	e.data.size, buf = binary.BigEndian.Uint64(buf), buf[8:]
291
292	// Slice data.
293	e.data.buf = data[e.data.offset:]
294	e.data.buf = e.data.buf[:e.data.size]
295
296	// Parse hash index offset/size.
297	e.hashIndex.offset, buf = binary.BigEndian.Uint64(buf), buf[8:]
298	e.hashIndex.size, buf = binary.BigEndian.Uint64(buf), buf[8:]
299
300	// Slice hash index data.
301	e.hashIndex.buf = data[e.hashIndex.offset:]
302	e.hashIndex.buf = e.hashIndex.buf[:e.hashIndex.size]
303
304	// Parse key.
305	n, sz := binary.Uvarint(buf)
306	e.key, buf = buf[sz:sz+int(n)], buf[int(n)+sz:]
307
308	// Save length of elem.
309	e.size = start - len(buf)
310}
311
312// TagBlockValueElem represents a tag value element.
313type TagBlockValueElem struct {
314	flag  byte
315	value []byte
316
317	// Legacy uvarint-encoded series data.
318	// Mutually exclusive with seriesIDSetData field.
319	series struct {
320		n    uint64 // Series count
321		data []byte // Raw series data
322	}
323
324	// Roaring bitmap encoded series data.
325	// Mutually exclusive with series.data field.
326	seriesIDSetData []byte
327
328	size int
329}
330
331// Deleted returns true if the element has been tombstoned.
332func (e *TagBlockValueElem) Deleted() bool { return (e.flag & TagValueTombstoneFlag) != 0 }
333
334// Value returns the value for the element.
335func (e *TagBlockValueElem) Value() []byte { return e.value }
336
337// SeriesN returns the series count.
338func (e *TagBlockValueElem) SeriesN() uint64 { return e.series.n }
339
340// SeriesData returns the raw series data.
341func (e *TagBlockValueElem) SeriesData() []byte { return e.series.data }
342
343// SeriesID returns series ID at an index.
344func (e *TagBlockValueElem) SeriesID(i int) uint64 {
345	return binary.BigEndian.Uint64(e.series.data[i*SeriesIDSize:])
346}
347
348// SeriesIDs returns a list decoded series ids.
349func (e *TagBlockValueElem) SeriesIDs() ([]uint64, error) {
350	if e.seriesIDSetData != nil {
351		ss, err := e.SeriesIDSet()
352		if err != nil {
353			return nil, err
354		}
355		return ss.Slice(), nil
356	}
357
358	a := make([]uint64, 0, e.series.n)
359	var prev uint64
360	for data := e.series.data; len(data) > 0; {
361		delta, n, err := uvarint(data)
362		if err != nil {
363			return nil, err
364		}
365		data = data[n:]
366
367		seriesID := prev + uint64(delta)
368		a = append(a, seriesID)
369		prev = seriesID
370	}
371	return a, nil
372}
373
374// SeriesIDSet returns a set of series ids.
375func (e *TagBlockValueElem) SeriesIDSet() (*tsdb.SeriesIDSet, error) {
376	ss := tsdb.NewSeriesIDSet()
377
378	// Read bitmap data directly from mmap, if available.
379	if e.seriesIDSetData != nil {
380		if err := ss.UnmarshalBinaryUnsafe(e.seriesIDSetData); err != nil {
381			return nil, err
382		}
383		return ss, nil
384	}
385
386	// Otherwise decode series ids from uvarint encoding.
387	var prev uint64
388	for data := e.series.data; len(data) > 0; {
389		delta, n, err := uvarint(data)
390		if err != nil {
391			return nil, err
392		}
393		data = data[n:]
394
395		seriesID := prev + uint64(delta)
396		ss.AddNoLock(seriesID)
397		prev = seriesID
398	}
399	return ss, nil
400}
401
402// Size returns the size of the element.
403func (e *TagBlockValueElem) Size() int { return e.size }
404
405// unmarshal unmarshals buf into e.
406func (e *TagBlockValueElem) unmarshal(buf []byte) {
407	start := len(buf)
408
409	// Parse flag data.
410	e.flag, buf = buf[0], buf[1:]
411
412	// Parse value.
413	sz, n := binary.Uvarint(buf)
414	e.value, buf = buf[n:n+int(sz)], buf[n+int(sz):]
415
416	// Parse series count.
417	v, n := binary.Uvarint(buf)
418	e.series.n = uint64(v)
419	buf = buf[n:]
420
421	// Parse data block size.
422	sz, n = binary.Uvarint(buf)
423	buf = buf[n:]
424
425	// Parse series data (original uvarint encoded or roaring bitmap).
426	if e.flag&TagValueSeriesIDSetFlag == 0 {
427		e.series.data, buf = buf[:sz], buf[sz:]
428	} else {
429		// buf = memalign(buf)
430		e.seriesIDSetData, buf = buf, buf[sz:]
431	}
432
433	// Save length of elem.
434	e.size = start - len(buf)
435}
436
437// TagBlockTrailerSize is the total size of the on-disk trailer.
438const TagBlockTrailerSize = 0 +
439	8 + 8 + // value data offset/size
440	8 + 8 + // key data offset/size
441	8 + 8 + // hash index offset/size
442	8 + // size
443	2 // version
444
445// TagBlockTrailer represents meta data at the end of a TagBlock.
446type TagBlockTrailer struct {
447	Version int   // Encoding version
448	Size    int64 // Total size w/ trailer
449
450	// Offset & size of value data section.
451	ValueData struct {
452		Offset int64
453		Size   int64
454	}
455
456	// Offset & size of key data section.
457	KeyData struct {
458		Offset int64
459		Size   int64
460	}
461
462	// Offset & size of hash map section.
463	HashIndex struct {
464		Offset int64
465		Size   int64
466	}
467}
468
469// WriteTo writes the trailer to w.
470func (t *TagBlockTrailer) WriteTo(w io.Writer) (n int64, err error) {
471	// Write data info.
472	if err := writeUint64To(w, uint64(t.ValueData.Offset), &n); err != nil {
473		return n, err
474	} else if err := writeUint64To(w, uint64(t.ValueData.Size), &n); err != nil {
475		return n, err
476	}
477
478	// Write key data info.
479	if err := writeUint64To(w, uint64(t.KeyData.Offset), &n); err != nil {
480		return n, err
481	} else if err := writeUint64To(w, uint64(t.KeyData.Size), &n); err != nil {
482		return n, err
483	}
484
485	// Write hash index info.
486	if err := writeUint64To(w, uint64(t.HashIndex.Offset), &n); err != nil {
487		return n, err
488	} else if err := writeUint64To(w, uint64(t.HashIndex.Size), &n); err != nil {
489		return n, err
490	}
491
492	// Write total size & encoding version.
493	if err := writeUint64To(w, uint64(t.Size), &n); err != nil {
494		return n, err
495	} else if err := writeUint16To(w, IndexFileVersion, &n); err != nil {
496		return n, err
497	}
498
499	return n, nil
500}
501
502// ReadTagBlockTrailer returns the tag block trailer from data.
503func ReadTagBlockTrailer(data []byte) (TagBlockTrailer, error) {
504	var t TagBlockTrailer
505
506	// Read version.
507	t.Version = int(binary.BigEndian.Uint16(data[len(data)-2:]))
508	if t.Version != TagBlockVersion {
509		return t, ErrUnsupportedTagBlockVersion
510	}
511
512	// Slice trailer data.
513	buf := data[len(data)-TagBlockTrailerSize:]
514
515	// Read data section info.
516	t.ValueData.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
517	t.ValueData.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
518
519	// Read key section info.
520	t.KeyData.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
521	t.KeyData.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
522
523	// Read hash section info.
524	t.HashIndex.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
525	t.HashIndex.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:]
526
527	// Read total size.
528	t.Size = int64(binary.BigEndian.Uint64(buf[0:8]))
529
530	return t, nil
531}
532
533// TagBlockEncoder encodes a tags to a TagBlock section.
534type TagBlockEncoder struct {
535	w   io.Writer
536	buf bytes.Buffer
537
538	// Track value offsets.
539	offsets *rhh.HashMap
540
541	// Track bytes written, sections.
542	n       int64
543	trailer TagBlockTrailer
544
545	// Track tag keys.
546	keys      []tagKeyEncodeEntry
547	prevValue []byte
548}
549
550// NewTagBlockEncoder returns a new TagBlockEncoder.
551func NewTagBlockEncoder(w io.Writer) *TagBlockEncoder {
552	return &TagBlockEncoder{
553		w:       w,
554		offsets: rhh.NewHashMap(rhh.Options{LoadFactor: LoadFactor}),
555		trailer: TagBlockTrailer{
556			Version: TagBlockVersion,
557		},
558	}
559}
560
561// N returns the number of bytes written.
562func (enc *TagBlockEncoder) N() int64 { return enc.n }
563
564// EncodeKey writes a tag key to the underlying writer.
565func (enc *TagBlockEncoder) EncodeKey(key []byte, deleted bool) error {
566	// An initial empty byte must be written.
567	if err := enc.ensureHeaderWritten(); err != nil {
568		return err
569	}
570
571	// Verify key is lexicographically after previous key.
572	if len(enc.keys) > 0 {
573		prev := enc.keys[len(enc.keys)-1].key
574		if cmp := bytes.Compare(prev, key); cmp == 1 {
575			return fmt.Errorf("tag key out of order: prev=%s, new=%s", prev, key)
576		} else if cmp == 0 {
577			return fmt.Errorf("tag key already encoded: %s", key)
578		}
579	}
580
581	// Flush values section for key.
582	if err := enc.flushValueHashIndex(); err != nil {
583		return err
584	}
585
586	// Append key on to the end of the key list.
587	entry := tagKeyEncodeEntry{
588		key:     key,
589		deleted: deleted,
590	}
591	entry.data.offset = enc.n
592
593	enc.keys = append(enc.keys, entry)
594
595	// Clear previous value.
596	enc.prevValue = nil
597
598	return nil
599}
600
601// EncodeValue writes a tag value to the underlying writer.
602// The tag key must be lexicographical sorted after the previous encoded tag key.
603func (enc *TagBlockEncoder) EncodeValue(value []byte, deleted bool, ss *tsdb.SeriesIDSet) error {
604	if len(enc.keys) == 0 {
605		return fmt.Errorf("tag key must be encoded before encoding values")
606	} else if len(value) == 0 {
607		return fmt.Errorf("zero length tag value not allowed")
608	}
609
610	// Validate that keys are in-order.
611	if cmp := bytes.Compare(enc.prevValue, value); cmp == 1 {
612		return fmt.Errorf("tag value out of order: prev=%s, new=%s", enc.prevValue, value)
613	} else if cmp == 0 {
614		return fmt.Errorf("tag value already encoded: %s", value)
615	}
616
617	// Save offset to hash map.
618	enc.offsets.Put(value, enc.n)
619
620	// Write flag.
621	if err := writeUint8To(enc.w, encodeTagValueFlag(deleted), &enc.n); err != nil {
622		return err
623	}
624
625	// Write value.
626	if err := writeUvarintTo(enc.w, uint64(len(value)), &enc.n); err != nil {
627		return err
628	} else if err := writeTo(enc.w, value, &enc.n); err != nil {
629		return err
630	}
631
632	// Build series data in buffer.
633	enc.buf.Reset()
634	if _, err := ss.WriteTo(&enc.buf); err != nil {
635		return err
636	}
637
638	// Write series count.
639	if err := writeUvarintTo(enc.w, uint64(ss.Cardinality()), &enc.n); err != nil {
640		return err
641	}
642
643	// Write data size & buffer.
644	if err := writeUvarintTo(enc.w, uint64(enc.buf.Len()), &enc.n); err != nil {
645		return err
646	}
647
648	// Word align bitmap data.
649	// if offset := (enc.n) % 8; offset != 0 {
650	// 	if err := writeTo(enc.w, make([]byte, 8-offset), &enc.n); err != nil {
651	// 		return err
652	// 	}
653	// }
654
655	nn, err := enc.buf.WriteTo(enc.w)
656	if enc.n += nn; err != nil {
657		return err
658	}
659
660	// Save previous value.
661	enc.prevValue = value
662
663	return nil
664}
665
666// Close flushes the trailer of the encoder to the writer.
667func (enc *TagBlockEncoder) Close() error {
668	// Flush last value set.
669	if err := enc.ensureHeaderWritten(); err != nil {
670		return err
671	} else if err := enc.flushValueHashIndex(); err != nil {
672		return err
673	}
674
675	// Save ending position of entire data block.
676	enc.trailer.ValueData.Size = enc.n - enc.trailer.ValueData.Offset
677
678	// Write key block to point to value blocks.
679	if err := enc.encodeTagKeyBlock(); err != nil {
680		return err
681	}
682
683	// Compute total size w/ trailer.
684	enc.trailer.Size = enc.n + TagBlockTrailerSize
685
686	// Write trailer.
687	nn, err := enc.trailer.WriteTo(enc.w)
688	enc.n += nn
689	return err
690}
691
692// ensureHeaderWritten writes a single byte to offset the rest of the block.
693func (enc *TagBlockEncoder) ensureHeaderWritten() error {
694	if enc.n > 0 {
695		return nil
696	} else if _, err := enc.w.Write([]byte{0}); err != nil {
697		return err
698	}
699
700	enc.n++
701	enc.trailer.ValueData.Offset = enc.n
702
703	return nil
704}
705
706// flushValueHashIndex builds writes the hash map at the end of a value set.
707func (enc *TagBlockEncoder) flushValueHashIndex() error {
708	// Ignore if no keys have been written.
709	if len(enc.keys) == 0 {
710		return nil
711	}
712	key := &enc.keys[len(enc.keys)-1]
713
714	// Save size of data section.
715	key.data.size = enc.n - key.data.offset
716
717	// Encode hash map length.
718	key.hashIndex.offset = enc.n
719	if err := writeUint64To(enc.w, uint64(enc.offsets.Cap()), &enc.n); err != nil {
720		return err
721	}
722
723	// Encode hash map offset entries.
724	for i := int64(0); i < enc.offsets.Cap(); i++ {
725		_, v := enc.offsets.Elem(i)
726		offset, _ := v.(int64)
727		if err := writeUint64To(enc.w, uint64(offset), &enc.n); err != nil {
728			return err
729		}
730	}
731	key.hashIndex.size = enc.n - key.hashIndex.offset
732
733	// Clear offsets.
734	enc.offsets = rhh.NewHashMap(rhh.Options{LoadFactor: LoadFactor})
735
736	return nil
737}
738
739// encodeTagKeyBlock encodes the keys section to the writer.
740func (enc *TagBlockEncoder) encodeTagKeyBlock() error {
741	offsets := rhh.NewHashMap(rhh.Options{Capacity: int64(len(enc.keys)), LoadFactor: LoadFactor})
742
743	// Encode key list in sorted order.
744	enc.trailer.KeyData.Offset = enc.n
745	for i := range enc.keys {
746		entry := &enc.keys[i]
747
748		// Save current offset so we can use it in the hash index.
749		offsets.Put(entry.key, enc.n)
750
751		if err := writeUint8To(enc.w, encodeTagKeyFlag(entry.deleted), &enc.n); err != nil {
752			return err
753		}
754
755		// Write value data offset & size.
756		if err := writeUint64To(enc.w, uint64(entry.data.offset), &enc.n); err != nil {
757			return err
758		} else if err := writeUint64To(enc.w, uint64(entry.data.size), &enc.n); err != nil {
759			return err
760		}
761
762		// Write value hash index offset & size.
763		if err := writeUint64To(enc.w, uint64(entry.hashIndex.offset), &enc.n); err != nil {
764			return err
765		} else if err := writeUint64To(enc.w, uint64(entry.hashIndex.size), &enc.n); err != nil {
766			return err
767		}
768
769		// Write key length and data.
770		if err := writeUvarintTo(enc.w, uint64(len(entry.key)), &enc.n); err != nil {
771			return err
772		} else if err := writeTo(enc.w, entry.key, &enc.n); err != nil {
773			return err
774		}
775	}
776	enc.trailer.KeyData.Size = enc.n - enc.trailer.KeyData.Offset
777
778	// Encode hash map length.
779	enc.trailer.HashIndex.Offset = enc.n
780	if err := writeUint64To(enc.w, uint64(offsets.Cap()), &enc.n); err != nil {
781		return err
782	}
783
784	// Encode hash map offset entries.
785	for i := int64(0); i < offsets.Cap(); i++ {
786		_, v := offsets.Elem(i)
787		offset, _ := v.(int64)
788		if err := writeUint64To(enc.w, uint64(offset), &enc.n); err != nil {
789			return err
790		}
791	}
792	enc.trailer.HashIndex.Size = enc.n - enc.trailer.HashIndex.Offset
793
794	return nil
795}
796
797type tagKeyEncodeEntry struct {
798	key     []byte
799	deleted bool
800
801	data struct {
802		offset int64
803		size   int64
804	}
805	hashIndex struct {
806		offset int64
807		size   int64
808	}
809}
810
811func encodeTagKeyFlag(deleted bool) byte {
812	var flag byte
813	if deleted {
814		flag |= TagKeyTombstoneFlag
815	}
816	return flag
817}
818
819func encodeTagValueFlag(deleted bool) byte {
820	flag := byte(TagValueSeriesIDSetFlag)
821	if deleted {
822		flag |= TagValueTombstoneFlag
823	}
824	return flag
825}
826