1package tsi1
2
3import (
4	"bufio"
5	"bytes"
6	"encoding/binary"
7	"errors"
8	"fmt"
9	"hash/crc32"
10	"io"
11	"os"
12	"sort"
13	"sync"
14	"time"
15	"unsafe"
16
17	"github.com/influxdata/influxdb/models"
18	"github.com/influxdata/influxdb/pkg/bloom"
19	"github.com/influxdata/influxdb/pkg/estimator"
20	"github.com/influxdata/influxdb/pkg/estimator/hll"
21	"github.com/influxdata/influxdb/pkg/mmap"
22	"github.com/influxdata/influxdb/tsdb"
23)
24
25// Log errors.
26var (
27	ErrLogEntryChecksumMismatch = errors.New("log entry checksum mismatch")
28)
29
30// Log entry flag constants.
31const (
32	LogEntrySeriesTombstoneFlag      = 0x01
33	LogEntryMeasurementTombstoneFlag = 0x02
34	LogEntryTagKeyTombstoneFlag      = 0x04
35	LogEntryTagValueTombstoneFlag    = 0x08
36)
37
38// defaultLogFileBufferSize describes the size of the buffer that the LogFile's buffered
39// writer uses. If the LogFile does not have an explicit buffer size set then
40// this is the size of the buffer; it is equal to the default buffer size used
41// by a bufio.Writer.
42const defaultLogFileBufferSize = 4096
43
44// indexFileBufferSize is the buffer size used when compacting the LogFile down
45// into a .tsi file.
46const indexFileBufferSize = 1 << 17 // 128K
47
48// LogFile represents an on-disk write-ahead log file.
49type LogFile struct {
50	mu         sync.RWMutex
51	wg         sync.WaitGroup // ref count
52	id         int            // file sequence identifier
53	data       []byte         // mmap
54	file       *os.File       // writer
55	w          *bufio.Writer  // buffered writer
56	bufferSize int            // The size of the buffer used by the buffered writer
57	nosync     bool           // Disables buffer flushing and file syncing. Useful for offline tooling.
58	buf        []byte         // marshaling buffer
59	keyBuf     []byte
60
61	sfile   *tsdb.SeriesFile // series lookup
62	size    int64            // tracks current file size
63	modTime time.Time        // tracks last time write occurred
64
65	// In-memory series existence/tombstone sets.
66	seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet
67
68	// In-memory index.
69	mms logMeasurements
70
71	// Filepath to the log file.
72	path string
73}
74
75// NewLogFile returns a new instance of LogFile.
76func NewLogFile(sfile *tsdb.SeriesFile, path string) *LogFile {
77	return &LogFile{
78		sfile: sfile,
79		path:  path,
80		mms:   make(logMeasurements),
81
82		seriesIDSet:          tsdb.NewSeriesIDSet(),
83		tombstoneSeriesIDSet: tsdb.NewSeriesIDSet(),
84	}
85}
86
87// bytes estimates the memory footprint of this LogFile, in bytes.
88func (f *LogFile) bytes() int {
89	var b int
90	b += 24 // mu RWMutex is 24 bytes
91	b += 16 // wg WaitGroup is 16 bytes
92	b += int(unsafe.Sizeof(f.id))
93	// Do not include f.data because it is mmap'd
94	// TODO(jacobmarble): Uncomment when we are using go >= 1.10.0
95	//b += int(unsafe.Sizeof(f.w)) + f.w.Size()
96	b += int(unsafe.Sizeof(f.buf)) + len(f.buf)
97	b += int(unsafe.Sizeof(f.keyBuf)) + len(f.keyBuf)
98	// Do not count SeriesFile because it belongs to the code that constructed this Index.
99	b += int(unsafe.Sizeof(f.size))
100	b += int(unsafe.Sizeof(f.modTime))
101	b += int(unsafe.Sizeof(f.seriesIDSet)) + f.seriesIDSet.Bytes()
102	b += int(unsafe.Sizeof(f.tombstoneSeriesIDSet)) + f.tombstoneSeriesIDSet.Bytes()
103	b += int(unsafe.Sizeof(f.mms)) + f.mms.bytes()
104	b += int(unsafe.Sizeof(f.path)) + len(f.path)
105	return b
106}
107
108// Open reads the log from a file and validates all the checksums.
109func (f *LogFile) Open() error {
110	if err := f.open(); err != nil {
111		f.Close()
112		return err
113	}
114	return nil
115}
116
117func (f *LogFile) open() error {
118	f.id, _ = ParseFilename(f.path)
119
120	// Open file for appending.
121	file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_CREATE, 0666)
122	if err != nil {
123		return err
124	}
125	f.file = file
126
127	if f.bufferSize == 0 {
128		f.bufferSize = defaultLogFileBufferSize
129	}
130	f.w = bufio.NewWriterSize(f.file, f.bufferSize)
131
132	// Finish opening if file is empty.
133	fi, err := file.Stat()
134	if err != nil {
135		return err
136	} else if fi.Size() == 0 {
137		return nil
138	}
139	f.size = fi.Size()
140	f.modTime = fi.ModTime()
141
142	// Open a read-only memory map of the existing data.
143	data, err := mmap.Map(f.Path(), 0)
144	if err != nil {
145		return err
146	}
147	f.data = data
148
149	// Read log entries from mmap.
150	var n int64
151	for buf := f.data; len(buf) > 0; {
152		// Read next entry. Truncate partial writes.
153		var e LogEntry
154		if err := e.UnmarshalBinary(buf); err == io.ErrShortBuffer || err == ErrLogEntryChecksumMismatch {
155			break
156		} else if err != nil {
157			return err
158		}
159
160		// Execute entry against in-memory index.
161		f.execEntry(&e)
162
163		// Move buffer forward.
164		n += int64(e.Size)
165		buf = buf[e.Size:]
166	}
167
168	// Move to the end of the file.
169	f.size = n
170	_, err = file.Seek(n, io.SeekStart)
171	return err
172}
173
174// Close shuts down the file handle and mmap.
175func (f *LogFile) Close() error {
176	// Wait until the file has no more references.
177	f.wg.Wait()
178
179	if f.w != nil {
180		f.w.Flush()
181		f.w = nil
182	}
183
184	if f.file != nil {
185		f.file.Close()
186		f.file = nil
187	}
188
189	if f.data != nil {
190		mmap.Unmap(f.data)
191	}
192
193	f.mms = make(logMeasurements)
194	return nil
195}
196
197// FlushAndSync flushes buffered data to disk and then fsyncs the underlying file.
198// If the LogFile has disabled flushing and syncing then FlushAndSync is a no-op.
199func (f *LogFile) FlushAndSync() error {
200	if f.nosync {
201		return nil
202	}
203
204	if f.w != nil {
205		if err := f.w.Flush(); err != nil {
206			return err
207		}
208	}
209
210	if f.file == nil {
211		return nil
212	}
213	return f.file.Sync()
214}
215
216// ID returns the file sequence identifier.
217func (f *LogFile) ID() int { return f.id }
218
219// Path returns the file path.
220func (f *LogFile) Path() string { return f.path }
221
222// SetPath sets the log file's path.
223func (f *LogFile) SetPath(path string) { f.path = path }
224
225// Level returns the log level of the file.
226func (f *LogFile) Level() int { return 0 }
227
228// Filter returns the bloom filter for the file.
229func (f *LogFile) Filter() *bloom.Filter { return nil }
230
231// Retain adds a reference count to the file.
232func (f *LogFile) Retain() { f.wg.Add(1) }
233
234// Release removes a reference count from the file.
235func (f *LogFile) Release() { f.wg.Done() }
236
237// Stat returns size and last modification time of the file.
238func (f *LogFile) Stat() (int64, time.Time) {
239	f.mu.RLock()
240	size, modTime := f.size, f.modTime
241	f.mu.RUnlock()
242	return size, modTime
243}
244
245// SeriesIDSet returns the series existence set.
246func (f *LogFile) SeriesIDSet() (*tsdb.SeriesIDSet, error) {
247	return f.seriesIDSet, nil
248}
249
250// TombstoneSeriesIDSet returns the series tombstone set.
251func (f *LogFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) {
252	return f.tombstoneSeriesIDSet, nil
253}
254
255// Size returns the size of the file, in bytes.
256func (f *LogFile) Size() int64 {
257	f.mu.RLock()
258	v := f.size
259	f.mu.RUnlock()
260	return v
261}
262
263// Measurement returns a measurement element.
264func (f *LogFile) Measurement(name []byte) MeasurementElem {
265	f.mu.RLock()
266	defer f.mu.RUnlock()
267
268	mm, ok := f.mms[string(name)]
269	if !ok {
270		return nil
271	}
272
273	return mm
274}
275
276func (f *LogFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) bool {
277	f.mu.RLock()
278	defer f.mu.RUnlock()
279
280	mm, ok := f.mms[string(name)]
281	if !ok {
282		return false
283	}
284
285	// TODO(edd): if mm is using a seriesSet then this could be changed to do a fast intersection.
286	for _, id := range mm.seriesIDs() {
287		if ss.Contains(id) {
288			return true
289		}
290	}
291	return false
292}
293
294// MeasurementNames returns an ordered list of measurement names.
295func (f *LogFile) MeasurementNames() []string {
296	f.mu.RLock()
297	defer f.mu.RUnlock()
298	return f.measurementNames()
299}
300
301func (f *LogFile) measurementNames() []string {
302	a := make([]string, 0, len(f.mms))
303	for name := range f.mms {
304		a = append(a, name)
305	}
306	sort.Strings(a)
307	return a
308}
309
310// DeleteMeasurement adds a tombstone for a measurement to the log file.
311func (f *LogFile) DeleteMeasurement(name []byte) error {
312	f.mu.Lock()
313	defer f.mu.Unlock()
314
315	e := LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name}
316	if err := f.appendEntry(&e); err != nil {
317		return err
318	}
319	f.execEntry(&e)
320
321	// Flush buffer and sync to disk.
322	return f.FlushAndSync()
323}
324
325// TagKeySeriesIDIterator returns a series iterator for a tag key.
326func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
327	f.mu.RLock()
328	defer f.mu.RUnlock()
329
330	mm, ok := f.mms[string(name)]
331	if !ok {
332		return nil
333	}
334
335	tk, ok := mm.tagSet[string(key)]
336	if !ok {
337		return nil
338	}
339
340	// Combine iterators across all tag keys.
341	itrs := make([]tsdb.SeriesIDIterator, 0, len(tk.tagValues))
342	for _, tv := range tk.tagValues {
343		if tv.cardinality() == 0 {
344			continue
345		}
346		if itr := tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()); itr != nil {
347			itrs = append(itrs, itr)
348		}
349	}
350
351	return tsdb.MergeSeriesIDIterators(itrs...)
352}
353
354// TagKeyIterator returns a value iterator for a measurement.
355func (f *LogFile) TagKeyIterator(name []byte) TagKeyIterator {
356	f.mu.RLock()
357	defer f.mu.RUnlock()
358
359	mm, ok := f.mms[string(name)]
360	if !ok {
361		return nil
362	}
363
364	a := make([]logTagKey, 0, len(mm.tagSet))
365	for _, k := range mm.tagSet {
366		a = append(a, k)
367	}
368	return newLogTagKeyIterator(a)
369}
370
371// TagKey returns a tag key element.
372func (f *LogFile) TagKey(name, key []byte) TagKeyElem {
373	f.mu.RLock()
374	defer f.mu.RUnlock()
375
376	mm, ok := f.mms[string(name)]
377	if !ok {
378		return nil
379	}
380
381	tk, ok := mm.tagSet[string(key)]
382	if !ok {
383		return nil
384	}
385
386	return &tk
387}
388
389// TagValue returns a tag value element.
390func (f *LogFile) TagValue(name, key, value []byte) TagValueElem {
391	f.mu.RLock()
392	defer f.mu.RUnlock()
393
394	mm, ok := f.mms[string(name)]
395	if !ok {
396		return nil
397	}
398
399	tk, ok := mm.tagSet[string(key)]
400	if !ok {
401		return nil
402	}
403
404	tv, ok := tk.tagValues[string(value)]
405	if !ok {
406		return nil
407	}
408
409	return &tv
410}
411
412// TagValueIterator returns a value iterator for a tag key.
413func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator {
414	f.mu.RLock()
415	defer f.mu.RUnlock()
416
417	mm, ok := f.mms[string(name)]
418	if !ok {
419		return nil
420	}
421
422	tk, ok := mm.tagSet[string(key)]
423	if !ok {
424		return nil
425	}
426	return tk.TagValueIterator()
427}
428
429// DeleteTagKey adds a tombstone for a tag key to the log file.
430func (f *LogFile) DeleteTagKey(name, key []byte) error {
431	f.mu.Lock()
432	defer f.mu.Unlock()
433
434	e := LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: key}
435	if err := f.appendEntry(&e); err != nil {
436		return err
437	}
438	f.execEntry(&e)
439
440	// Flush buffer and sync to disk.
441	return f.FlushAndSync()
442}
443
444// TagValueSeriesIDSet returns a series iterator for a tag value.
445func (f *LogFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) {
446	f.mu.RLock()
447	defer f.mu.RUnlock()
448
449	mm, ok := f.mms[string(name)]
450	if !ok {
451		return nil, nil
452	}
453
454	tk, ok := mm.tagSet[string(key)]
455	if !ok {
456		return nil, nil
457	}
458
459	tv, ok := tk.tagValues[string(value)]
460	if !ok {
461		return nil, nil
462	} else if tv.cardinality() == 0 {
463		return nil, nil
464	}
465
466	return tv.seriesIDSet(), nil
467}
468
469// MeasurementN returns the total number of measurements.
470func (f *LogFile) MeasurementN() (n uint64) {
471	f.mu.RLock()
472	defer f.mu.RUnlock()
473	return uint64(len(f.mms))
474}
475
476// TagKeyN returns the total number of keys.
477func (f *LogFile) TagKeyN() (n uint64) {
478	f.mu.RLock()
479	defer f.mu.RUnlock()
480	for _, mm := range f.mms {
481		n += uint64(len(mm.tagSet))
482	}
483	return n
484}
485
486// TagValueN returns the total number of values.
487func (f *LogFile) TagValueN() (n uint64) {
488	f.mu.RLock()
489	defer f.mu.RUnlock()
490	for _, mm := range f.mms {
491		for _, k := range mm.tagSet {
492			n += uint64(len(k.tagValues))
493		}
494	}
495	return n
496}
497
498// DeleteTagValue adds a tombstone for a tag value to the log file.
499func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
500	f.mu.Lock()
501	defer f.mu.Unlock()
502
503	e := LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: key, Value: value}
504	if err := f.appendEntry(&e); err != nil {
505		return err
506	}
507	f.execEntry(&e)
508
509	// Flush buffer and sync to disk.
510	return f.FlushAndSync()
511}
512
513// AddSeriesList adds a list of series to the log file in bulk.
514func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
515	seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice)
516	if err != nil {
517		return nil, err
518	}
519
520	var writeRequired bool
521	entries := make([]LogEntry, 0, len(names))
522	seriesSet.RLock()
523	for i := range names {
524		if seriesSet.ContainsNoLock(seriesIDs[i]) {
525			// We don't need to allocate anything for this series.
526			seriesIDs[i] = 0
527			continue
528		}
529		writeRequired = true
530		entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true, batchidx: i})
531	}
532	seriesSet.RUnlock()
533
534	// Exit if all series already exist.
535	if !writeRequired {
536		return seriesIDs, nil
537	}
538
539	f.mu.Lock()
540	defer f.mu.Unlock()
541
542	seriesSet.Lock()
543	defer seriesSet.Unlock()
544
545	for i := range entries { // NB - this doesn't evaluate all series ids returned from series file.
546		entry := &entries[i]
547		if seriesSet.ContainsNoLock(entry.SeriesID) {
548			// We don't need to allocate anything for this series.
549			seriesIDs[entry.batchidx] = 0
550			continue
551		}
552		if err := f.appendEntry(entry); err != nil {
553			return nil, err
554		}
555		f.execEntry(entry)
556		seriesSet.AddNoLock(entry.SeriesID)
557	}
558
559	// Flush buffer and sync to disk.
560	if err := f.FlushAndSync(); err != nil {
561		return nil, err
562	}
563	return seriesIDs, nil
564}
565
566// DeleteSeriesID adds a tombstone for a series id.
567func (f *LogFile) DeleteSeriesID(id uint64) error {
568	f.mu.Lock()
569	defer f.mu.Unlock()
570
571	e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id}
572	if err := f.appendEntry(&e); err != nil {
573		return err
574	}
575	f.execEntry(&e)
576
577	// Flush buffer and sync to disk.
578	return f.FlushAndSync()
579}
580
581// SeriesN returns the total number of series in the file.
582func (f *LogFile) SeriesN() (n uint64) {
583	f.mu.RLock()
584	defer f.mu.RUnlock()
585
586	for _, mm := range f.mms {
587		n += uint64(mm.cardinality())
588	}
589	return n
590}
591
592// appendEntry adds a log entry to the end of the file.
593func (f *LogFile) appendEntry(e *LogEntry) error {
594	// Marshal entry to the local buffer.
595	f.buf = appendLogEntry(f.buf[:0], e)
596
597	// Save the size of the record.
598	e.Size = len(f.buf)
599
600	// Write record to file.
601	n, err := f.w.Write(f.buf)
602	if err != nil {
603		// Move position backwards over partial entry.
604		// Log should be reopened if seeking cannot be completed.
605		if n > 0 {
606			f.w.Reset(f.file)
607			if _, err := f.file.Seek(int64(-n), io.SeekCurrent); err != nil {
608				f.Close()
609			}
610		}
611		return err
612	}
613
614	// Update in-memory file size & modification time.
615	f.size += int64(n)
616	f.modTime = time.Now()
617
618	return nil
619}
620
621// execEntry executes a log entry against the in-memory index.
622// This is done after appending and on replay of the log.
623func (f *LogFile) execEntry(e *LogEntry) {
624	switch e.Flag {
625	case LogEntryMeasurementTombstoneFlag:
626		f.execDeleteMeasurementEntry(e)
627	case LogEntryTagKeyTombstoneFlag:
628		f.execDeleteTagKeyEntry(e)
629	case LogEntryTagValueTombstoneFlag:
630		f.execDeleteTagValueEntry(e)
631	default:
632		f.execSeriesEntry(e)
633	}
634}
635
636func (f *LogFile) execDeleteMeasurementEntry(e *LogEntry) {
637	mm := f.createMeasurementIfNotExists(e.Name)
638	mm.deleted = true
639	mm.tagSet = make(map[string]logTagKey)
640	mm.series = make(map[uint64]struct{})
641	mm.seriesSet = nil
642}
643
644func (f *LogFile) execDeleteTagKeyEntry(e *LogEntry) {
645	mm := f.createMeasurementIfNotExists(e.Name)
646	ts := mm.createTagSetIfNotExists(e.Key)
647
648	ts.deleted = true
649
650	mm.tagSet[string(e.Key)] = ts
651}
652
653func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) {
654	mm := f.createMeasurementIfNotExists(e.Name)
655	ts := mm.createTagSetIfNotExists(e.Key)
656	tv := ts.createTagValueIfNotExists(e.Value)
657
658	tv.deleted = true
659
660	ts.tagValues[string(e.Value)] = tv
661	mm.tagSet[string(e.Key)] = ts
662}
663
664func (f *LogFile) execSeriesEntry(e *LogEntry) {
665	var seriesKey []byte
666	if e.cached {
667		sz := tsdb.SeriesKeySize(e.name, e.tags)
668		if len(f.keyBuf) < sz {
669			f.keyBuf = make([]byte, 0, sz)
670		}
671		seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags)
672	} else {
673		seriesKey = f.sfile.SeriesKey(e.SeriesID)
674	}
675
676	// Series keys can be removed if the series has been deleted from
677	// the entire database and the server is restarted. This would cause
678	// the log to replay its insert but the key cannot be found.
679	//
680	// https://github.com/influxdata/influxdb/issues/9444
681	if seriesKey == nil {
682		return
683	}
684
685	// Check if deleted.
686	deleted := e.Flag == LogEntrySeriesTombstoneFlag
687
688	// Read key size.
689	_, remainder := tsdb.ReadSeriesKeyLen(seriesKey)
690
691	// Read measurement name.
692	name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)
693	mm := f.createMeasurementIfNotExists(name)
694	mm.deleted = false
695	if !deleted {
696		mm.addSeriesID(e.SeriesID)
697	} else {
698		mm.removeSeriesID(e.SeriesID)
699	}
700
701	// Read tag count.
702	tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder)
703
704	// Save tags.
705	var k, v []byte
706	for i := 0; i < tagN; i++ {
707		k, v, remainder = tsdb.ReadSeriesKeyTag(remainder)
708		ts := mm.createTagSetIfNotExists(k)
709		tv := ts.createTagValueIfNotExists(v)
710
711		// Add/remove a reference to the series on the tag value.
712		if !deleted {
713			tv.addSeriesID(e.SeriesID)
714		} else {
715			tv.removeSeriesID(e.SeriesID)
716		}
717
718		ts.tagValues[string(v)] = tv
719		mm.tagSet[string(k)] = ts
720	}
721
722	// Add/remove from appropriate series id sets.
723	if !deleted {
724		f.seriesIDSet.Add(e.SeriesID)
725		f.tombstoneSeriesIDSet.Remove(e.SeriesID)
726	} else {
727		f.seriesIDSet.Remove(e.SeriesID)
728		f.tombstoneSeriesIDSet.Add(e.SeriesID)
729	}
730}
731
732// SeriesIDIterator returns an iterator over all series in the log file.
733func (f *LogFile) SeriesIDIterator() tsdb.SeriesIDIterator {
734	f.mu.RLock()
735	defer f.mu.RUnlock()
736
737	ss := tsdb.NewSeriesIDSet()
738	allSeriesSets := make([]*tsdb.SeriesIDSet, 0, len(f.mms))
739
740	for _, mm := range f.mms {
741		if mm.seriesSet != nil {
742			allSeriesSets = append(allSeriesSets, mm.seriesSet)
743			continue
744		}
745
746		// measurement is not using seriesSet to store series IDs.
747		mm.forEach(func(seriesID uint64) {
748			ss.AddNoLock(seriesID)
749		})
750	}
751
752	// Fast merge all seriesSets.
753	if len(allSeriesSets) > 0 {
754		ss.Merge(allSeriesSets...)
755	}
756
757	return tsdb.NewSeriesIDSetIterator(ss)
758}
759
760// createMeasurementIfNotExists returns a measurement by name.
761func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement {
762	mm := f.mms[string(name)]
763	if mm == nil {
764		mm = &logMeasurement{
765			name:   name,
766			tagSet: make(map[string]logTagKey),
767			series: make(map[uint64]struct{}),
768		}
769		f.mms[string(name)] = mm
770	}
771	return mm
772}
773
774// MeasurementIterator returns an iterator over all the measurements in the file.
775func (f *LogFile) MeasurementIterator() MeasurementIterator {
776	f.mu.RLock()
777	defer f.mu.RUnlock()
778
779	var itr logMeasurementIterator
780	for _, mm := range f.mms {
781		itr.mms = append(itr.mms, *mm)
782	}
783	sort.Sort(logMeasurementSlice(itr.mms))
784	return &itr
785}
786
787// MeasurementSeriesIDIterator returns an iterator over all series for a measurement.
788func (f *LogFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator {
789	f.mu.RLock()
790	defer f.mu.RUnlock()
791
792	mm := f.mms[string(name)]
793	if mm == nil || mm.cardinality() == 0 {
794		return nil
795	}
796	return tsdb.NewSeriesIDSetIterator(mm.seriesIDSet())
797}
798
799// CompactTo compacts the log file and writes it to w.
800func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n int64, err error) {
801	f.mu.RLock()
802	defer f.mu.RUnlock()
803
804	// Check for cancellation.
805	select {
806	case <-cancel:
807		return n, ErrCompactionInterrupted
808	default:
809	}
810
811	// Wrap in bufferred writer with a buffer equivalent to the LogFile size.
812	bw := bufio.NewWriterSize(w, indexFileBufferSize) // 128K
813
814	// Setup compaction offset tracking data.
815	var t IndexFileTrailer
816	info := newLogFileCompactInfo()
817	info.cancel = cancel
818
819	// Write magic number.
820	if err := writeTo(bw, []byte(FileSignature), &n); err != nil {
821		return n, err
822	}
823
824	// Retreve measurement names in order.
825	names := f.measurementNames()
826
827	// Flush buffer & mmap series block.
828	if err := bw.Flush(); err != nil {
829		return n, err
830	}
831
832	// Write tagset blocks in measurement order.
833	if err := f.writeTagsetsTo(bw, names, info, &n); err != nil {
834		return n, err
835	}
836
837	// Write measurement block.
838	t.MeasurementBlock.Offset = n
839	if err := f.writeMeasurementBlockTo(bw, names, info, &n); err != nil {
840		return n, err
841	}
842	t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset
843
844	// Write series set.
845	t.SeriesIDSet.Offset = n
846	nn, err := f.seriesIDSet.WriteTo(bw)
847	if n += nn; err != nil {
848		return n, err
849	}
850	t.SeriesIDSet.Size = n - t.SeriesIDSet.Offset
851
852	// Write tombstone series set.
853	t.TombstoneSeriesIDSet.Offset = n
854	nn, err = f.tombstoneSeriesIDSet.WriteTo(bw)
855	if n += nn; err != nil {
856		return n, err
857	}
858	t.TombstoneSeriesIDSet.Size = n - t.TombstoneSeriesIDSet.Offset
859
860	// Build series sketches.
861	sSketch, sTSketch, err := f.seriesSketches()
862	if err != nil {
863		return n, err
864	}
865
866	// Write series sketches.
867	t.SeriesSketch.Offset = n
868	data, err := sSketch.MarshalBinary()
869	if err != nil {
870		return n, err
871	} else if _, err := bw.Write(data); err != nil {
872		return n, err
873	}
874	t.SeriesSketch.Size = int64(len(data))
875	n += t.SeriesSketch.Size
876
877	t.TombstoneSeriesSketch.Offset = n
878	if data, err = sTSketch.MarshalBinary(); err != nil {
879		return n, err
880	} else if _, err := bw.Write(data); err != nil {
881		return n, err
882	}
883	t.TombstoneSeriesSketch.Size = int64(len(data))
884	n += t.TombstoneSeriesSketch.Size
885
886	// Write trailer.
887	nn, err = t.WriteTo(bw)
888	n += nn
889	if err != nil {
890		return n, err
891	}
892
893	// Flush buffer.
894	if err := bw.Flush(); err != nil {
895		return n, err
896	}
897
898	return n, nil
899}
900
901func (f *LogFile) writeTagsetsTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error {
902	for _, name := range names {
903		if err := f.writeTagsetTo(w, name, info, n); err != nil {
904			return err
905		}
906	}
907	return nil
908}
909
910// writeTagsetTo writes a single tagset to w and saves the tagset offset.
911func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactInfo, n *int64) error {
912	mm := f.mms[name]
913
914	// Check for cancellation.
915	select {
916	case <-info.cancel:
917		return ErrCompactionInterrupted
918	default:
919	}
920
921	enc := NewTagBlockEncoder(w)
922	var valueN int
923	for _, k := range mm.keys() {
924		tag := mm.tagSet[k]
925
926		// Encode tag. Skip values if tag is deleted.
927		if err := enc.EncodeKey(tag.name, tag.deleted); err != nil {
928			return err
929		} else if tag.deleted {
930			continue
931		}
932
933		// Sort tag values.
934		values := make([]string, 0, len(tag.tagValues))
935		for v := range tag.tagValues {
936			values = append(values, v)
937		}
938		sort.Strings(values)
939
940		// Add each value.
941		for _, v := range values {
942			value := tag.tagValues[v]
943			if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDSet()); err != nil {
944				return err
945			}
946
947			// Check for cancellation periodically.
948			if valueN++; valueN%1000 == 0 {
949				select {
950				case <-info.cancel:
951					return ErrCompactionInterrupted
952				default:
953				}
954			}
955		}
956	}
957
958	// Save tagset offset to measurement.
959	offset := *n
960
961	// Flush tag block.
962	err := enc.Close()
963	*n += enc.N()
964	if err != nil {
965		return err
966	}
967
968	// Save tagset offset to measurement.
969	size := *n - offset
970
971	info.mms[name] = &logFileMeasurementCompactInfo{offset: offset, size: size}
972
973	return nil
974}
975
976func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error {
977	mw := NewMeasurementBlockWriter()
978
979	// Check for cancellation.
980	select {
981	case <-info.cancel:
982		return ErrCompactionInterrupted
983	default:
984	}
985
986	// Add measurement data.
987	for _, name := range names {
988		mm := f.mms[name]
989		mmInfo := info.mms[name]
990		assert(mmInfo != nil, "measurement info not found")
991		mw.Add(mm.name, mm.deleted, mmInfo.offset, mmInfo.size, mm.seriesIDs())
992	}
993
994	// Flush data to writer.
995	nn, err := mw.WriteTo(w)
996	*n += nn
997	return err
998}
999
1000// logFileCompactInfo is a context object to track compaction position info.
1001type logFileCompactInfo struct {
1002	cancel <-chan struct{}
1003	mms    map[string]*logFileMeasurementCompactInfo
1004}
1005
1006// newLogFileCompactInfo returns a new instance of logFileCompactInfo.
1007func newLogFileCompactInfo() *logFileCompactInfo {
1008	return &logFileCompactInfo{
1009		mms: make(map[string]*logFileMeasurementCompactInfo),
1010	}
1011}
1012
1013type logFileMeasurementCompactInfo struct {
1014	offset int64
1015	size   int64
1016}
1017
1018// MeasurementsSketches returns sketches for existing and tombstoned measurement names.
1019func (f *LogFile) MeasurementsSketches() (sketch, tSketch estimator.Sketch, err error) {
1020	f.mu.RLock()
1021	defer f.mu.RUnlock()
1022	return f.measurementsSketches()
1023}
1024
1025func (f *LogFile) measurementsSketches() (sketch, tSketch estimator.Sketch, err error) {
1026	sketch, tSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
1027	for _, mm := range f.mms {
1028		if mm.deleted {
1029			tSketch.Add(mm.name)
1030		} else {
1031			sketch.Add(mm.name)
1032		}
1033	}
1034	return sketch, tSketch, nil
1035}
1036
1037// SeriesSketches returns sketches for existing and tombstoned series.
1038func (f *LogFile) SeriesSketches() (sketch, tSketch estimator.Sketch, err error) {
1039	f.mu.RLock()
1040	defer f.mu.RUnlock()
1041	return f.seriesSketches()
1042}
1043
1044func (f *LogFile) seriesSketches() (sketch, tSketch estimator.Sketch, err error) {
1045	sketch = hll.NewDefaultPlus()
1046	f.seriesIDSet.ForEach(func(id uint64) {
1047		name, keys := f.sfile.Series(id)
1048		sketch.Add(models.MakeKey(name, keys))
1049	})
1050
1051	tSketch = hll.NewDefaultPlus()
1052	f.tombstoneSeriesIDSet.ForEach(func(id uint64) {
1053		name, keys := f.sfile.Series(id)
1054		tSketch.Add(models.MakeKey(name, keys))
1055	})
1056	return sketch, tSketch, nil
1057}
1058
1059// LogEntry represents a single log entry in the write-ahead log.
1060type LogEntry struct {
1061	Flag     byte   // flag
1062	SeriesID uint64 // series id
1063	Name     []byte // measurement name
1064	Key      []byte // tag key
1065	Value    []byte // tag value
1066	Checksum uint32 // checksum of flag/name/tags.
1067	Size     int    // total size of record, in bytes.
1068
1069	cached   bool        // Hint to LogFile that series data is already parsed
1070	name     []byte      // series naem, this is a cached copy of the parsed measurement name
1071	tags     models.Tags // series tags, this is a cached copied of the parsed tags
1072	batchidx int         // position of entry in batch.
1073}
1074
1075// UnmarshalBinary unmarshals data into e.
1076func (e *LogEntry) UnmarshalBinary(data []byte) error {
1077	var sz uint64
1078	var n int
1079	var seriesID uint64
1080	var err error
1081
1082	orig := data
1083	start := len(data)
1084
1085	// Parse flag data.
1086	if len(data) < 1 {
1087		return io.ErrShortBuffer
1088	}
1089	e.Flag, data = data[0], data[1:]
1090
1091	// Parse series id.
1092	if seriesID, n, err = uvarint(data); err != nil {
1093		return err
1094	}
1095	e.SeriesID, data = seriesID, data[n:]
1096
1097	// Parse name length.
1098	if sz, n, err = uvarint(data); err != nil {
1099		return err
1100	}
1101
1102	// Read name data.
1103	if len(data) < n+int(sz) {
1104		return io.ErrShortBuffer
1105	}
1106	e.Name, data = data[n:n+int(sz)], data[n+int(sz):]
1107
1108	// Parse key length.
1109	if sz, n, err = uvarint(data); err != nil {
1110		return err
1111	}
1112
1113	// Read key data.
1114	if len(data) < n+int(sz) {
1115		return io.ErrShortBuffer
1116	}
1117	e.Key, data = data[n:n+int(sz)], data[n+int(sz):]
1118
1119	// Parse value length.
1120	if sz, n, err = uvarint(data); err != nil {
1121		return err
1122	}
1123
1124	// Read value data.
1125	if len(data) < n+int(sz) {
1126		return io.ErrShortBuffer
1127	}
1128	e.Value, data = data[n:n+int(sz)], data[n+int(sz):]
1129
1130	// Compute checksum.
1131	chk := crc32.ChecksumIEEE(orig[:start-len(data)])
1132
1133	// Parse checksum.
1134	if len(data) < 4 {
1135		return io.ErrShortBuffer
1136	}
1137	e.Checksum, data = binary.BigEndian.Uint32(data[:4]), data[4:]
1138
1139	// Verify checksum.
1140	if chk != e.Checksum {
1141		return ErrLogEntryChecksumMismatch
1142	}
1143
1144	// Save length of elem.
1145	e.Size = start - len(data)
1146
1147	return nil
1148}
1149
1150// appendLogEntry appends to dst and returns the new buffer.
1151// This updates the checksum on the entry.
1152func appendLogEntry(dst []byte, e *LogEntry) []byte {
1153	var buf [binary.MaxVarintLen64]byte
1154	start := len(dst)
1155
1156	// Append flag.
1157	dst = append(dst, e.Flag)
1158
1159	// Append series id.
1160	n := binary.PutUvarint(buf[:], uint64(e.SeriesID))
1161	dst = append(dst, buf[:n]...)
1162
1163	// Append name.
1164	n = binary.PutUvarint(buf[:], uint64(len(e.Name)))
1165	dst = append(dst, buf[:n]...)
1166	dst = append(dst, e.Name...)
1167
1168	// Append key.
1169	n = binary.PutUvarint(buf[:], uint64(len(e.Key)))
1170	dst = append(dst, buf[:n]...)
1171	dst = append(dst, e.Key...)
1172
1173	// Append value.
1174	n = binary.PutUvarint(buf[:], uint64(len(e.Value)))
1175	dst = append(dst, buf[:n]...)
1176	dst = append(dst, e.Value...)
1177
1178	// Calculate checksum.
1179	e.Checksum = crc32.ChecksumIEEE(dst[start:])
1180
1181	// Append checksum.
1182	binary.BigEndian.PutUint32(buf[:4], e.Checksum)
1183	dst = append(dst, buf[:4]...)
1184
1185	return dst
1186}
1187
1188// logMeasurements represents a map of measurement names to measurements.
1189type logMeasurements map[string]*logMeasurement
1190
1191// bytes estimates the memory footprint of this logMeasurements, in bytes.
1192func (mms *logMeasurements) bytes() int {
1193	var b int
1194	for k, v := range *mms {
1195		b += len(k)
1196		b += v.bytes()
1197	}
1198	b += int(unsafe.Sizeof(*mms))
1199	return b
1200}
1201
1202type logMeasurement struct {
1203	name      []byte
1204	tagSet    map[string]logTagKey
1205	deleted   bool
1206	series    map[uint64]struct{}
1207	seriesSet *tsdb.SeriesIDSet
1208}
1209
1210// bytes estimates the memory footprint of this logMeasurement, in bytes.
1211func (m *logMeasurement) bytes() int {
1212	var b int
1213	b += len(m.name)
1214	for k, v := range m.tagSet {
1215		b += len(k)
1216		b += v.bytes()
1217	}
1218	b += (int(m.cardinality()) * 8)
1219	b += int(unsafe.Sizeof(*m))
1220	return b
1221}
1222
1223func (m *logMeasurement) addSeriesID(x uint64) {
1224	if m.seriesSet != nil {
1225		m.seriesSet.AddNoLock(x)
1226		return
1227	}
1228
1229	m.series[x] = struct{}{}
1230
1231	// If the map is getting too big it can be converted into a roaring seriesSet.
1232	if len(m.series) > 25 {
1233		m.seriesSet = tsdb.NewSeriesIDSet()
1234		for id := range m.series {
1235			m.seriesSet.AddNoLock(id)
1236		}
1237		m.series = nil
1238	}
1239}
1240
1241func (m *logMeasurement) removeSeriesID(x uint64) {
1242	if m.seriesSet != nil {
1243		m.seriesSet.RemoveNoLock(x)
1244		return
1245	}
1246	delete(m.series, x)
1247}
1248
1249func (m *logMeasurement) cardinality() int64 {
1250	if m.seriesSet != nil {
1251		return int64(m.seriesSet.Cardinality())
1252	}
1253	return int64(len(m.series))
1254}
1255
1256// forEach applies fn to every series ID in the logMeasurement.
1257func (m *logMeasurement) forEach(fn func(uint64)) {
1258	if m.seriesSet != nil {
1259		m.seriesSet.ForEachNoLock(fn)
1260		return
1261	}
1262
1263	for seriesID := range m.series {
1264		fn(seriesID)
1265	}
1266}
1267
1268// seriesIDs returns a sorted set of seriesIDs.
1269func (m *logMeasurement) seriesIDs() []uint64 {
1270	a := make([]uint64, 0, m.cardinality())
1271	if m.seriesSet != nil {
1272		m.seriesSet.ForEachNoLock(func(id uint64) { a = append(a, id) })
1273		return a // IDs are already sorted.
1274	}
1275
1276	for seriesID := range m.series {
1277		a = append(a, seriesID)
1278	}
1279	sort.Sort(uint64Slice(a))
1280	return a
1281}
1282
1283// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new
1284// one
1285func (m *logMeasurement) seriesIDSet() *tsdb.SeriesIDSet {
1286	if m.seriesSet != nil {
1287		return m.seriesSet.CloneNoLock()
1288	}
1289
1290	ss := tsdb.NewSeriesIDSet()
1291	for seriesID := range m.series {
1292		ss.AddNoLock(seriesID)
1293	}
1294	return ss
1295}
1296
1297func (m *logMeasurement) Name() []byte  { return m.name }
1298func (m *logMeasurement) Deleted() bool { return m.deleted }
1299
1300func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey {
1301	ts, ok := m.tagSet[string(key)]
1302	if !ok {
1303		ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)}
1304	}
1305	return ts
1306}
1307
1308// keys returns a sorted list of tag keys.
1309func (m *logMeasurement) keys() []string {
1310	a := make([]string, 0, len(m.tagSet))
1311	for k := range m.tagSet {
1312		a = append(a, k)
1313	}
1314	sort.Strings(a)
1315	return a
1316}
1317
1318// logMeasurementSlice is a sortable list of log measurements.
1319type logMeasurementSlice []logMeasurement
1320
1321func (a logMeasurementSlice) Len() int           { return len(a) }
1322func (a logMeasurementSlice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
1323func (a logMeasurementSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
1324
1325// logMeasurementIterator represents an iterator over a slice of measurements.
1326type logMeasurementIterator struct {
1327	mms []logMeasurement
1328}
1329
1330// Next returns the next element in the iterator.
1331func (itr *logMeasurementIterator) Next() (e MeasurementElem) {
1332	if len(itr.mms) == 0 {
1333		return nil
1334	}
1335	e, itr.mms = &itr.mms[0], itr.mms[1:]
1336	return e
1337}
1338
1339type logTagKey struct {
1340	name      []byte
1341	deleted   bool
1342	tagValues map[string]logTagValue
1343}
1344
1345// bytes estimates the memory footprint of this logTagKey, in bytes.
1346func (tk *logTagKey) bytes() int {
1347	var b int
1348	b += len(tk.name)
1349	for k, v := range tk.tagValues {
1350		b += len(k)
1351		b += v.bytes()
1352	}
1353	b += int(unsafe.Sizeof(*tk))
1354	return b
1355}
1356
1357func (tk *logTagKey) Key() []byte   { return tk.name }
1358func (tk *logTagKey) Deleted() bool { return tk.deleted }
1359
1360func (tk *logTagKey) TagValueIterator() TagValueIterator {
1361	a := make([]logTagValue, 0, len(tk.tagValues))
1362	for _, v := range tk.tagValues {
1363		a = append(a, v)
1364	}
1365	return newLogTagValueIterator(a)
1366}
1367
1368func (tk *logTagKey) createTagValueIfNotExists(value []byte) logTagValue {
1369	tv, ok := tk.tagValues[string(value)]
1370	if !ok {
1371		tv = logTagValue{name: value, series: make(map[uint64]struct{})}
1372	}
1373	return tv
1374}
1375
1376// logTagKey is a sortable list of log tag keys.
1377type logTagKeySlice []logTagKey
1378
1379func (a logTagKeySlice) Len() int           { return len(a) }
1380func (a logTagKeySlice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
1381func (a logTagKeySlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
1382
1383type logTagValue struct {
1384	name      []byte
1385	deleted   bool
1386	series    map[uint64]struct{}
1387	seriesSet *tsdb.SeriesIDSet
1388}
1389
1390// bytes estimates the memory footprint of this logTagValue, in bytes.
1391func (tv *logTagValue) bytes() int {
1392	var b int
1393	b += len(tv.name)
1394	b += int(unsafe.Sizeof(*tv))
1395	b += (int(tv.cardinality()) * 8)
1396	return b
1397}
1398
1399func (tv *logTagValue) addSeriesID(x uint64) {
1400	if tv.seriesSet != nil {
1401		tv.seriesSet.AddNoLock(x)
1402		return
1403	}
1404
1405	tv.series[x] = struct{}{}
1406
1407	// If the map is getting too big it can be converted into a roaring seriesSet.
1408	if len(tv.series) > 25 {
1409		tv.seriesSet = tsdb.NewSeriesIDSet()
1410		for id := range tv.series {
1411			tv.seriesSet.AddNoLock(id)
1412		}
1413		tv.series = nil
1414	}
1415}
1416
1417func (tv *logTagValue) removeSeriesID(x uint64) {
1418	if tv.seriesSet != nil {
1419		tv.seriesSet.RemoveNoLock(x)
1420		return
1421	}
1422	delete(tv.series, x)
1423}
1424
1425func (tv *logTagValue) cardinality() int64 {
1426	if tv.seriesSet != nil {
1427		return int64(tv.seriesSet.Cardinality())
1428	}
1429	return int64(len(tv.series))
1430}
1431
1432// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new
1433// one
1434func (tv *logTagValue) seriesIDSet() *tsdb.SeriesIDSet {
1435	if tv.seriesSet != nil {
1436		return tv.seriesSet.CloneNoLock()
1437	}
1438
1439	ss := tsdb.NewSeriesIDSet()
1440	for seriesID := range tv.series {
1441		ss.AddNoLock(seriesID)
1442	}
1443	return ss
1444}
1445
1446func (tv *logTagValue) Value() []byte { return tv.name }
1447func (tv *logTagValue) Deleted() bool { return tv.deleted }
1448
1449// logTagValue is a sortable list of log tag values.
1450type logTagValueSlice []logTagValue
1451
1452func (a logTagValueSlice) Len() int           { return len(a) }
1453func (a logTagValueSlice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
1454func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
1455
1456// logTagKeyIterator represents an iterator over a slice of tag keys.
1457type logTagKeyIterator struct {
1458	a []logTagKey
1459}
1460
1461// newLogTagKeyIterator returns a new instance of logTagKeyIterator.
1462func newLogTagKeyIterator(a []logTagKey) *logTagKeyIterator {
1463	sort.Sort(logTagKeySlice(a))
1464	return &logTagKeyIterator{a: a}
1465}
1466
1467// Next returns the next element in the iterator.
1468func (itr *logTagKeyIterator) Next() (e TagKeyElem) {
1469	if len(itr.a) == 0 {
1470		return nil
1471	}
1472	e, itr.a = &itr.a[0], itr.a[1:]
1473	return e
1474}
1475
1476// logTagValueIterator represents an iterator over a slice of tag values.
1477type logTagValueIterator struct {
1478	a []logTagValue
1479}
1480
1481// newLogTagValueIterator returns a new instance of logTagValueIterator.
1482func newLogTagValueIterator(a []logTagValue) *logTagValueIterator {
1483	sort.Sort(logTagValueSlice(a))
1484	return &logTagValueIterator{a: a}
1485}
1486
1487// Next returns the next element in the iterator.
1488func (itr *logTagValueIterator) Next() (e TagValueElem) {
1489	if len(itr.a) == 0 {
1490		return nil
1491	}
1492	e, itr.a = &itr.a[0], itr.a[1:]
1493	return e
1494}
1495
1496// FormatLogFileName generates a log filename for the given index.
1497func FormatLogFileName(id int) string {
1498	return fmt.Sprintf("L0-%08d%s", id, LogFileExt)
1499}
1500