1package tsi1
2
3import (
4	"bufio"
5	"bytes"
6	"encoding/binary"
7	"errors"
8	"fmt"
9	"hash/crc32"
10	"io"
11	"os"
12	"sort"
13	"sync"
14	"time"
15	"unsafe"
16
17	"github.com/influxdata/influxdb/models"
18	"github.com/influxdata/influxdb/pkg/bloom"
19	"github.com/influxdata/influxdb/pkg/estimator"
20	"github.com/influxdata/influxdb/pkg/estimator/hll"
21	"github.com/influxdata/influxdb/pkg/mmap"
22	"github.com/influxdata/influxdb/tsdb"
23)
24
25// Log errors.
26var (
27	ErrLogEntryChecksumMismatch = errors.New("log entry checksum mismatch")
28)
29
30// Log entry flag constants.
31const (
32	LogEntrySeriesTombstoneFlag      = 0x01
33	LogEntryMeasurementTombstoneFlag = 0x02
34	LogEntryTagKeyTombstoneFlag      = 0x04
35	LogEntryTagValueTombstoneFlag    = 0x08
36)
37
38// defaultLogFileBufferSize describes the size of the buffer that the LogFile's buffered
39// writer uses. If the LogFile does not have an explicit buffer size set then
40// this is the size of the buffer; it is equal to the default buffer size used
41// by a bufio.Writer.
42const defaultLogFileBufferSize = 4096
43
44// indexFileBufferSize is the buffer size used when compacting the LogFile down
45// into a .tsi file.
46const indexFileBufferSize = 1 << 17 // 128K
47
48// LogFile represents an on-disk write-ahead log file.
49type LogFile struct {
50	mu         sync.RWMutex
51	wg         sync.WaitGroup // ref count
52	id         int            // file sequence identifier
53	data       []byte         // mmap
54	file       *os.File       // writer
55	w          *bufio.Writer  // buffered writer
56	bufferSize int            // The size of the buffer used by the buffered writer
57	nosync     bool           // Disables buffer flushing and file syncing. Useful for offline tooling.
58	buf        []byte         // marshaling buffer
59	keyBuf     []byte
60
61	sfile   *tsdb.SeriesFile // series lookup
62	size    int64            // tracks current file size
63	modTime time.Time        // tracks last time write occurred
64
65	// In-memory series existence/tombstone sets.
66	seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet
67
68	// In-memory index.
69	mms logMeasurements
70
71	// Filepath to the log file.
72	path string
73}
74
75// NewLogFile returns a new instance of LogFile.
76func NewLogFile(sfile *tsdb.SeriesFile, path string) *LogFile {
77	return &LogFile{
78		sfile: sfile,
79		path:  path,
80		mms:   make(logMeasurements),
81
82		seriesIDSet:          tsdb.NewSeriesIDSet(),
83		tombstoneSeriesIDSet: tsdb.NewSeriesIDSet(),
84	}
85}
86
87// bytes estimates the memory footprint of this LogFile, in bytes.
88func (f *LogFile) bytes() int {
89	var b int
90	b += 24 // mu RWMutex is 24 bytes
91	b += 16 // wg WaitGroup is 16 bytes
92	b += int(unsafe.Sizeof(f.id))
93	// Do not include f.data because it is mmap'd
94	// TODO(jacobmarble): Uncomment when we are using go >= 1.10.0
95	//b += int(unsafe.Sizeof(f.w)) + f.w.Size()
96	b += int(unsafe.Sizeof(f.buf)) + len(f.buf)
97	b += int(unsafe.Sizeof(f.keyBuf)) + len(f.keyBuf)
98	// Do not count SeriesFile because it belongs to the code that constructed this Index.
99	b += int(unsafe.Sizeof(f.size))
100	b += int(unsafe.Sizeof(f.modTime))
101	b += int(unsafe.Sizeof(f.seriesIDSet)) + f.seriesIDSet.Bytes()
102	b += int(unsafe.Sizeof(f.tombstoneSeriesIDSet)) + f.tombstoneSeriesIDSet.Bytes()
103	b += int(unsafe.Sizeof(f.mms)) + f.mms.bytes()
104	b += int(unsafe.Sizeof(f.path)) + len(f.path)
105	return b
106}
107
108// Open reads the log from a file and validates all the checksums.
109func (f *LogFile) Open() error {
110	if err := f.open(); err != nil {
111		f.Close()
112		return err
113	}
114	return nil
115}
116
117func (f *LogFile) open() error {
118	f.id, _ = ParseFilename(f.path)
119
120	// Open file for appending.
121	file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_CREATE, 0666)
122	if err != nil {
123		return err
124	}
125	f.file = file
126
127	if f.bufferSize == 0 {
128		f.bufferSize = defaultLogFileBufferSize
129	}
130	f.w = bufio.NewWriterSize(f.file, f.bufferSize)
131
132	// Finish opening if file is empty.
133	fi, err := file.Stat()
134	if err != nil {
135		return err
136	} else if fi.Size() == 0 {
137		return nil
138	}
139	f.size = fi.Size()
140	f.modTime = fi.ModTime()
141
142	// Open a read-only memory map of the existing data.
143	data, err := mmap.Map(f.Path(), 0)
144	if err != nil {
145		return err
146	}
147	f.data = data
148
149	// Read log entries from mmap.
150	var n int64
151	for buf := f.data; len(buf) > 0; {
152		// Read next entry. Truncate partial writes.
153		var e LogEntry
154		if err := e.UnmarshalBinary(buf); err == io.ErrShortBuffer || err == ErrLogEntryChecksumMismatch {
155			break
156		} else if err != nil {
157			return err
158		}
159
160		// Execute entry against in-memory index.
161		f.execEntry(&e)
162
163		// Move buffer forward.
164		n += int64(e.Size)
165		buf = buf[e.Size:]
166	}
167
168	// Move to the end of the file.
169	f.size = n
170	_, err = file.Seek(n, io.SeekStart)
171	return err
172}
173
174// Close shuts down the file handle and mmap.
175func (f *LogFile) Close() error {
176	// Wait until the file has no more references.
177	f.wg.Wait()
178
179	if f.w != nil {
180		f.w.Flush()
181		f.w = nil
182	}
183
184	if f.file != nil {
185		f.file.Close()
186		f.file = nil
187	}
188
189	if f.data != nil {
190		mmap.Unmap(f.data)
191	}
192
193	f.mms = make(logMeasurements)
194	return nil
195}
196
197// FlushAndSync flushes buffered data to disk and then fsyncs the underlying file.
198// If the LogFile has disabled flushing and syncing then FlushAndSync is a no-op.
199func (f *LogFile) FlushAndSync() error {
200	if f.nosync {
201		return nil
202	}
203
204	if f.w != nil {
205		if err := f.w.Flush(); err != nil {
206			return err
207		}
208	}
209
210	if f.file == nil {
211		return nil
212	}
213	return f.file.Sync()
214}
215
216// ID returns the file sequence identifier.
217func (f *LogFile) ID() int { return f.id }
218
219// Path returns the file path.
220func (f *LogFile) Path() string { return f.path }
221
222// SetPath sets the log file's path.
223func (f *LogFile) SetPath(path string) { f.path = path }
224
225// Level returns the log level of the file.
226func (f *LogFile) Level() int { return 0 }
227
228// Filter returns the bloom filter for the file.
229func (f *LogFile) Filter() *bloom.Filter { return nil }
230
231// Retain adds a reference count to the file.
232func (f *LogFile) Retain() { f.wg.Add(1) }
233
234// Release removes a reference count from the file.
235func (f *LogFile) Release() { f.wg.Done() }
236
237// Stat returns size and last modification time of the file.
238func (f *LogFile) Stat() (int64, time.Time) {
239	f.mu.RLock()
240	size, modTime := f.size, f.modTime
241	f.mu.RUnlock()
242	return size, modTime
243}
244
245// SeriesIDSet returns the series existence set.
246func (f *LogFile) SeriesIDSet() (*tsdb.SeriesIDSet, error) {
247	return f.seriesIDSet, nil
248}
249
250// TombstoneSeriesIDSet returns the series tombstone set.
251func (f *LogFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) {
252	return f.tombstoneSeriesIDSet, nil
253}
254
255// Size returns the size of the file, in bytes.
256func (f *LogFile) Size() int64 {
257	f.mu.RLock()
258	v := f.size
259	f.mu.RUnlock()
260	return v
261}
262
263// Measurement returns a measurement element.
264func (f *LogFile) Measurement(name []byte) MeasurementElem {
265	f.mu.RLock()
266	defer f.mu.RUnlock()
267
268	mm, ok := f.mms[string(name)]
269	if !ok {
270		return nil
271	}
272
273	return mm
274}
275
276func (f *LogFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) bool {
277	f.mu.RLock()
278	defer f.mu.RUnlock()
279
280	mm, ok := f.mms[string(name)]
281	if !ok {
282		return false
283	}
284
285	// TODO(edd): if mm is using a seriesSet then this could be changed to do a fast intersection.
286	for _, id := range mm.seriesIDs() {
287		if ss.Contains(id) {
288			return true
289		}
290	}
291	return false
292}
293
294// MeasurementNames returns an ordered list of measurement names.
295func (f *LogFile) MeasurementNames() []string {
296	f.mu.RLock()
297	defer f.mu.RUnlock()
298	return f.measurementNames()
299}
300
301func (f *LogFile) measurementNames() []string {
302	a := make([]string, 0, len(f.mms))
303	for name := range f.mms {
304		a = append(a, name)
305	}
306	sort.Strings(a)
307	return a
308}
309
310// DeleteMeasurement adds a tombstone for a measurement to the log file.
311func (f *LogFile) DeleteMeasurement(name []byte) error {
312	f.mu.Lock()
313	defer f.mu.Unlock()
314
315	e := LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name}
316	if err := f.appendEntry(&e); err != nil {
317		return err
318	}
319	f.execEntry(&e)
320
321	// Flush buffer and sync to disk.
322	return f.FlushAndSync()
323}
324
325// TagKeySeriesIDIterator returns a series iterator for a tag key.
326func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) tsdb.SeriesIDIterator {
327	f.mu.RLock()
328	defer f.mu.RUnlock()
329
330	mm, ok := f.mms[string(name)]
331	if !ok {
332		return nil
333	}
334
335	tk, ok := mm.tagSet[string(key)]
336	if !ok {
337		return nil
338	}
339
340	// Combine iterators across all tag keys.
341	itrs := make([]tsdb.SeriesIDIterator, 0, len(tk.tagValues))
342	for _, tv := range tk.tagValues {
343		if tv.cardinality() == 0 {
344			continue
345		}
346		itrs = append(itrs, tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()))
347	}
348
349	return tsdb.MergeSeriesIDIterators(itrs...)
350}
351
352// TagKeyIterator returns a value iterator for a measurement.
353func (f *LogFile) TagKeyIterator(name []byte) TagKeyIterator {
354	f.mu.RLock()
355	defer f.mu.RUnlock()
356
357	mm, ok := f.mms[string(name)]
358	if !ok {
359		return nil
360	}
361
362	a := make([]logTagKey, 0, len(mm.tagSet))
363	for _, k := range mm.tagSet {
364		a = append(a, k)
365	}
366	return newLogTagKeyIterator(a)
367}
368
369// TagKey returns a tag key element.
370func (f *LogFile) TagKey(name, key []byte) TagKeyElem {
371	f.mu.RLock()
372	defer f.mu.RUnlock()
373
374	mm, ok := f.mms[string(name)]
375	if !ok {
376		return nil
377	}
378
379	tk, ok := mm.tagSet[string(key)]
380	if !ok {
381		return nil
382	}
383
384	return &tk
385}
386
387// TagValue returns a tag value element.
388func (f *LogFile) TagValue(name, key, value []byte) TagValueElem {
389	f.mu.RLock()
390	defer f.mu.RUnlock()
391
392	mm, ok := f.mms[string(name)]
393	if !ok {
394		return nil
395	}
396
397	tk, ok := mm.tagSet[string(key)]
398	if !ok {
399		return nil
400	}
401
402	tv, ok := tk.tagValues[string(value)]
403	if !ok {
404		return nil
405	}
406
407	return &tv
408}
409
410// TagValueIterator returns a value iterator for a tag key.
411func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator {
412	f.mu.RLock()
413	defer f.mu.RUnlock()
414
415	mm, ok := f.mms[string(name)]
416	if !ok {
417		return nil
418	}
419
420	tk, ok := mm.tagSet[string(key)]
421	if !ok {
422		return nil
423	}
424	return tk.TagValueIterator()
425}
426
427// DeleteTagKey adds a tombstone for a tag key to the log file.
428func (f *LogFile) DeleteTagKey(name, key []byte) error {
429	f.mu.Lock()
430	defer f.mu.Unlock()
431
432	e := LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: key}
433	if err := f.appendEntry(&e); err != nil {
434		return err
435	}
436	f.execEntry(&e)
437
438	// Flush buffer and sync to disk.
439	return f.FlushAndSync()
440}
441
442// TagValueSeriesIDIterator returns a series iterator for a tag value.
443func (f *LogFile) TagValueSeriesIDIterator(name, key, value []byte) tsdb.SeriesIDIterator {
444	f.mu.RLock()
445	defer f.mu.RUnlock()
446
447	mm, ok := f.mms[string(name)]
448	if !ok {
449		return nil
450	}
451
452	tk, ok := mm.tagSet[string(key)]
453	if !ok {
454		return nil
455	}
456
457	tv, ok := tk.tagValues[string(value)]
458	if !ok {
459		return nil
460	} else if tv.cardinality() == 0 {
461		return nil
462	}
463
464	return tsdb.NewSeriesIDSetIterator(tv.seriesIDSet())
465}
466
467// MeasurementN returns the total number of measurements.
468func (f *LogFile) MeasurementN() (n uint64) {
469	f.mu.RLock()
470	defer f.mu.RUnlock()
471	return uint64(len(f.mms))
472}
473
474// TagKeyN returns the total number of keys.
475func (f *LogFile) TagKeyN() (n uint64) {
476	f.mu.RLock()
477	defer f.mu.RUnlock()
478	for _, mm := range f.mms {
479		n += uint64(len(mm.tagSet))
480	}
481	return n
482}
483
484// TagValueN returns the total number of values.
485func (f *LogFile) TagValueN() (n uint64) {
486	f.mu.RLock()
487	defer f.mu.RUnlock()
488	for _, mm := range f.mms {
489		for _, k := range mm.tagSet {
490			n += uint64(len(k.tagValues))
491		}
492	}
493	return n
494}
495
496// DeleteTagValue adds a tombstone for a tag value to the log file.
497func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
498	f.mu.Lock()
499	defer f.mu.Unlock()
500
501	e := LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: key, Value: value}
502	if err := f.appendEntry(&e); err != nil {
503		return err
504	}
505	f.execEntry(&e)
506
507	// Flush buffer and sync to disk.
508	return f.FlushAndSync()
509}
510
511// AddSeriesList adds a list of series to the log file in bulk.
512func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) error {
513	seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice)
514	if err != nil {
515		return err
516	}
517
518	var writeRequired bool
519	entries := make([]LogEntry, 0, len(names))
520	seriesSet.RLock()
521	for i := range names {
522		if seriesSet.ContainsNoLock(seriesIDs[i]) {
523			// We don't need to allocate anything for this series.
524			continue
525		}
526		writeRequired = true
527		entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true})
528	}
529	seriesSet.RUnlock()
530
531	// Exit if all series already exist.
532	if !writeRequired {
533		return nil
534	}
535
536	f.mu.Lock()
537	defer f.mu.Unlock()
538
539	seriesSet.Lock()
540	defer seriesSet.Unlock()
541
542	for i := range entries {
543		entry := &entries[i]
544		if seriesSet.ContainsNoLock(entry.SeriesID) {
545			// We don't need to allocate anything for this series.
546			continue
547		}
548		if err := f.appendEntry(entry); err != nil {
549			return err
550		}
551		f.execEntry(entry)
552		seriesSet.AddNoLock(entry.SeriesID)
553	}
554
555	// Flush buffer and sync to disk.
556	return f.FlushAndSync()
557}
558
559// DeleteSeriesID adds a tombstone for a series id.
560func (f *LogFile) DeleteSeriesID(id uint64) error {
561	f.mu.Lock()
562	defer f.mu.Unlock()
563
564	e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id}
565	if err := f.appendEntry(&e); err != nil {
566		return err
567	}
568	f.execEntry(&e)
569
570	// Flush buffer and sync to disk.
571	return f.FlushAndSync()
572}
573
574// SeriesN returns the total number of series in the file.
575func (f *LogFile) SeriesN() (n uint64) {
576	f.mu.RLock()
577	defer f.mu.RUnlock()
578
579	for _, mm := range f.mms {
580		n += uint64(mm.cardinality())
581	}
582	return n
583}
584
585// appendEntry adds a log entry to the end of the file.
586func (f *LogFile) appendEntry(e *LogEntry) error {
587	// Marshal entry to the local buffer.
588	f.buf = appendLogEntry(f.buf[:0], e)
589
590	// Save the size of the record.
591	e.Size = len(f.buf)
592
593	// Write record to file.
594	n, err := f.w.Write(f.buf)
595	if err != nil {
596		// Move position backwards over partial entry.
597		// Log should be reopened if seeking cannot be completed.
598		if n > 0 {
599			f.w.Reset(f.file)
600			if _, err := f.file.Seek(int64(-n), io.SeekCurrent); err != nil {
601				f.Close()
602			}
603		}
604		return err
605	}
606
607	// Update in-memory file size & modification time.
608	f.size += int64(n)
609	f.modTime = time.Now()
610
611	return nil
612}
613
614// execEntry executes a log entry against the in-memory index.
615// This is done after appending and on replay of the log.
616func (f *LogFile) execEntry(e *LogEntry) {
617	switch e.Flag {
618	case LogEntryMeasurementTombstoneFlag:
619		f.execDeleteMeasurementEntry(e)
620	case LogEntryTagKeyTombstoneFlag:
621		f.execDeleteTagKeyEntry(e)
622	case LogEntryTagValueTombstoneFlag:
623		f.execDeleteTagValueEntry(e)
624	default:
625		f.execSeriesEntry(e)
626	}
627}
628
629func (f *LogFile) execDeleteMeasurementEntry(e *LogEntry) {
630	mm := f.createMeasurementIfNotExists(e.Name)
631	mm.deleted = true
632	mm.tagSet = make(map[string]logTagKey)
633	mm.series = make(map[uint64]struct{})
634	mm.seriesSet = nil
635}
636
637func (f *LogFile) execDeleteTagKeyEntry(e *LogEntry) {
638	mm := f.createMeasurementIfNotExists(e.Name)
639	ts := mm.createTagSetIfNotExists(e.Key)
640
641	ts.deleted = true
642
643	mm.tagSet[string(e.Key)] = ts
644}
645
646func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) {
647	mm := f.createMeasurementIfNotExists(e.Name)
648	ts := mm.createTagSetIfNotExists(e.Key)
649	tv := ts.createTagValueIfNotExists(e.Value)
650
651	tv.deleted = true
652
653	ts.tagValues[string(e.Value)] = tv
654	mm.tagSet[string(e.Key)] = ts
655}
656
657func (f *LogFile) execSeriesEntry(e *LogEntry) {
658	var seriesKey []byte
659	if e.cached {
660		sz := tsdb.SeriesKeySize(e.name, e.tags)
661		if len(f.keyBuf) < sz {
662			f.keyBuf = make([]byte, 0, sz)
663		}
664		seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags)
665	} else {
666		seriesKey = f.sfile.SeriesKey(e.SeriesID)
667	}
668
669	// Series keys can be removed if the series has been deleted from
670	// the entire database and the server is restarted. This would cause
671	// the log to replay its insert but the key cannot be found.
672	//
673	// https://github.com/influxdata/influxdb/issues/9444
674	if seriesKey == nil {
675		return
676	}
677
678	// Check if deleted.
679	deleted := e.Flag == LogEntrySeriesTombstoneFlag
680
681	// Read key size.
682	_, remainder := tsdb.ReadSeriesKeyLen(seriesKey)
683
684	// Read measurement name.
685	name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)
686	mm := f.createMeasurementIfNotExists(name)
687	mm.deleted = false
688	if !deleted {
689		mm.addSeriesID(e.SeriesID)
690	} else {
691		mm.removeSeriesID(e.SeriesID)
692	}
693
694	// Read tag count.
695	tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder)
696
697	// Save tags.
698	var k, v []byte
699	for i := 0; i < tagN; i++ {
700		k, v, remainder = tsdb.ReadSeriesKeyTag(remainder)
701		ts := mm.createTagSetIfNotExists(k)
702		tv := ts.createTagValueIfNotExists(v)
703
704		// Add/remove a reference to the series on the tag value.
705		if !deleted {
706			tv.addSeriesID(e.SeriesID)
707		} else {
708			tv.removeSeriesID(e.SeriesID)
709		}
710
711		ts.tagValues[string(v)] = tv
712		mm.tagSet[string(k)] = ts
713	}
714
715	// Add/remove from appropriate series id sets.
716	if !deleted {
717		f.seriesIDSet.Add(e.SeriesID)
718		f.tombstoneSeriesIDSet.Remove(e.SeriesID)
719	} else {
720		f.seriesIDSet.Remove(e.SeriesID)
721		f.tombstoneSeriesIDSet.Add(e.SeriesID)
722	}
723}
724
725// SeriesIDIterator returns an iterator over all series in the log file.
726func (f *LogFile) SeriesIDIterator() tsdb.SeriesIDIterator {
727	f.mu.RLock()
728	defer f.mu.RUnlock()
729
730	ss := tsdb.NewSeriesIDSet()
731	allSeriesSets := make([]*tsdb.SeriesIDSet, 0, len(f.mms))
732
733	for _, mm := range f.mms {
734		if mm.seriesSet != nil {
735			allSeriesSets = append(allSeriesSets, mm.seriesSet)
736			continue
737		}
738
739		// measurement is not using seriesSet to store series IDs.
740		mm.forEach(func(seriesID uint64) {
741			ss.AddNoLock(seriesID)
742		})
743	}
744
745	// Fast merge all seriesSets.
746	if len(allSeriesSets) > 0 {
747		ss.Merge(allSeriesSets...)
748	}
749
750	return tsdb.NewSeriesIDSetIterator(ss)
751}
752
753// createMeasurementIfNotExists returns a measurement by name.
754func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement {
755	mm := f.mms[string(name)]
756	if mm == nil {
757		mm = &logMeasurement{
758			name:   name,
759			tagSet: make(map[string]logTagKey),
760			series: make(map[uint64]struct{}),
761		}
762		f.mms[string(name)] = mm
763	}
764	return mm
765}
766
767// MeasurementIterator returns an iterator over all the measurements in the file.
768func (f *LogFile) MeasurementIterator() MeasurementIterator {
769	f.mu.RLock()
770	defer f.mu.RUnlock()
771
772	var itr logMeasurementIterator
773	for _, mm := range f.mms {
774		itr.mms = append(itr.mms, *mm)
775	}
776	sort.Sort(logMeasurementSlice(itr.mms))
777	return &itr
778}
779
780// MeasurementSeriesIDIterator returns an iterator over all series for a measurement.
781func (f *LogFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator {
782	f.mu.RLock()
783	defer f.mu.RUnlock()
784
785	mm := f.mms[string(name)]
786	if mm == nil || mm.cardinality() == 0 {
787		return nil
788	}
789	return tsdb.NewSeriesIDSetIterator(mm.seriesIDSet())
790}
791
792// CompactTo compacts the log file and writes it to w.
793func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n int64, err error) {
794	f.mu.RLock()
795	defer f.mu.RUnlock()
796
797	// Check for cancellation.
798	select {
799	case <-cancel:
800		return n, ErrCompactionInterrupted
801	default:
802	}
803
804	// Wrap in bufferred writer with a buffer equivalent to the LogFile size.
805	bw := bufio.NewWriterSize(w, indexFileBufferSize) // 128K
806
807	// Setup compaction offset tracking data.
808	var t IndexFileTrailer
809	info := newLogFileCompactInfo()
810	info.cancel = cancel
811
812	// Write magic number.
813	if err := writeTo(bw, []byte(FileSignature), &n); err != nil {
814		return n, err
815	}
816
817	// Retreve measurement names in order.
818	names := f.measurementNames()
819
820	// Flush buffer & mmap series block.
821	if err := bw.Flush(); err != nil {
822		return n, err
823	}
824
825	// Write tagset blocks in measurement order.
826	if err := f.writeTagsetsTo(bw, names, info, &n); err != nil {
827		return n, err
828	}
829
830	// Write measurement block.
831	t.MeasurementBlock.Offset = n
832	if err := f.writeMeasurementBlockTo(bw, names, info, &n); err != nil {
833		return n, err
834	}
835	t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset
836
837	// Write series set.
838	t.SeriesIDSet.Offset = n
839	nn, err := f.seriesIDSet.WriteTo(bw)
840	if n += nn; err != nil {
841		return n, err
842	}
843	t.SeriesIDSet.Size = n - t.SeriesIDSet.Offset
844
845	// Write tombstone series set.
846	t.TombstoneSeriesIDSet.Offset = n
847	nn, err = f.tombstoneSeriesIDSet.WriteTo(bw)
848	if n += nn; err != nil {
849		return n, err
850	}
851	t.TombstoneSeriesIDSet.Size = n - t.TombstoneSeriesIDSet.Offset
852
853	// Build series sketches.
854	sSketch, sTSketch, err := f.seriesSketches()
855	if err != nil {
856		return n, err
857	}
858
859	// Write series sketches.
860	t.SeriesSketch.Offset = n
861	data, err := sSketch.MarshalBinary()
862	if err != nil {
863		return n, err
864	} else if _, err := bw.Write(data); err != nil {
865		return n, err
866	}
867	t.SeriesSketch.Size = int64(len(data))
868	n += t.SeriesSketch.Size
869
870	t.TombstoneSeriesSketch.Offset = n
871	if data, err = sTSketch.MarshalBinary(); err != nil {
872		return n, err
873	} else if _, err := bw.Write(data); err != nil {
874		return n, err
875	}
876	t.TombstoneSeriesSketch.Size = int64(len(data))
877	n += t.TombstoneSeriesSketch.Size
878
879	// Write trailer.
880	nn, err = t.WriteTo(bw)
881	n += nn
882	if err != nil {
883		return n, err
884	}
885
886	// Flush buffer.
887	if err := bw.Flush(); err != nil {
888		return n, err
889	}
890
891	return n, nil
892}
893
894func (f *LogFile) writeTagsetsTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error {
895	for _, name := range names {
896		if err := f.writeTagsetTo(w, name, info, n); err != nil {
897			return err
898		}
899	}
900	return nil
901}
902
903// writeTagsetTo writes a single tagset to w and saves the tagset offset.
904func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactInfo, n *int64) error {
905	mm := f.mms[name]
906
907	// Check for cancellation.
908	select {
909	case <-info.cancel:
910		return ErrCompactionInterrupted
911	default:
912	}
913
914	enc := NewTagBlockEncoder(w)
915	var valueN int
916	for _, k := range mm.keys() {
917		tag := mm.tagSet[k]
918
919		// Encode tag. Skip values if tag is deleted.
920		if err := enc.EncodeKey(tag.name, tag.deleted); err != nil {
921			return err
922		} else if tag.deleted {
923			continue
924		}
925
926		// Sort tag values.
927		values := make([]string, 0, len(tag.tagValues))
928		for v := range tag.tagValues {
929			values = append(values, v)
930		}
931		sort.Strings(values)
932
933		// Add each value.
934		for _, v := range values {
935			value := tag.tagValues[v]
936			if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDs()); err != nil {
937				return err
938			}
939
940			// Check for cancellation periodically.
941			if valueN++; valueN%1000 == 0 {
942				select {
943				case <-info.cancel:
944					return ErrCompactionInterrupted
945				default:
946				}
947			}
948		}
949	}
950
951	// Save tagset offset to measurement.
952	offset := *n
953
954	// Flush tag block.
955	err := enc.Close()
956	*n += enc.N()
957	if err != nil {
958		return err
959	}
960
961	// Save tagset offset to measurement.
962	size := *n - offset
963
964	info.mms[name] = &logFileMeasurementCompactInfo{offset: offset, size: size}
965
966	return nil
967}
968
969func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error {
970	mw := NewMeasurementBlockWriter()
971
972	// Check for cancellation.
973	select {
974	case <-info.cancel:
975		return ErrCompactionInterrupted
976	default:
977	}
978
979	// Add measurement data.
980	for _, name := range names {
981		mm := f.mms[name]
982		mmInfo := info.mms[name]
983		assert(mmInfo != nil, "measurement info not found")
984		mw.Add(mm.name, mm.deleted, mmInfo.offset, mmInfo.size, mm.seriesIDs())
985	}
986
987	// Flush data to writer.
988	nn, err := mw.WriteTo(w)
989	*n += nn
990	return err
991}
992
993// logFileCompactInfo is a context object to track compaction position info.
994type logFileCompactInfo struct {
995	cancel <-chan struct{}
996	mms    map[string]*logFileMeasurementCompactInfo
997}
998
999// newLogFileCompactInfo returns a new instance of logFileCompactInfo.
1000func newLogFileCompactInfo() *logFileCompactInfo {
1001	return &logFileCompactInfo{
1002		mms: make(map[string]*logFileMeasurementCompactInfo),
1003	}
1004}
1005
1006type logFileMeasurementCompactInfo struct {
1007	offset int64
1008	size   int64
1009}
1010
1011// MeasurementsSketches returns sketches for existing and tombstoned measurement names.
1012func (f *LogFile) MeasurementsSketches() (sketch, tSketch estimator.Sketch, err error) {
1013	f.mu.RLock()
1014	defer f.mu.RUnlock()
1015	return f.measurementsSketches()
1016}
1017
1018func (f *LogFile) measurementsSketches() (sketch, tSketch estimator.Sketch, err error) {
1019	sketch, tSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
1020	for _, mm := range f.mms {
1021		if mm.deleted {
1022			tSketch.Add(mm.name)
1023		} else {
1024			sketch.Add(mm.name)
1025		}
1026	}
1027	return sketch, tSketch, nil
1028}
1029
1030// SeriesSketches returns sketches for existing and tombstoned series.
1031func (f *LogFile) SeriesSketches() (sketch, tSketch estimator.Sketch, err error) {
1032	f.mu.RLock()
1033	defer f.mu.RUnlock()
1034	return f.seriesSketches()
1035}
1036
1037func (f *LogFile) seriesSketches() (sketch, tSketch estimator.Sketch, err error) {
1038	sketch = hll.NewDefaultPlus()
1039	f.seriesIDSet.ForEach(func(id uint64) {
1040		name, keys := f.sfile.Series(id)
1041		sketch.Add(models.MakeKey(name, keys))
1042	})
1043
1044	tSketch = hll.NewDefaultPlus()
1045	f.tombstoneSeriesIDSet.ForEach(func(id uint64) {
1046		name, keys := f.sfile.Series(id)
1047		sketch.Add(models.MakeKey(name, keys))
1048	})
1049	return sketch, tSketch, nil
1050}
1051
1052// LogEntry represents a single log entry in the write-ahead log.
1053type LogEntry struct {
1054	Flag     byte   // flag
1055	SeriesID uint64 // series id
1056	Name     []byte // measurement name
1057	Key      []byte // tag key
1058	Value    []byte // tag value
1059	Checksum uint32 // checksum of flag/name/tags.
1060	Size     int    // total size of record, in bytes.
1061
1062	cached bool        // Hint to LogFile that series data is already parsed
1063	name   []byte      // series naem, this is a cached copy of the parsed measurement name
1064	tags   models.Tags // series tags, this is a cached copied of the parsed tags
1065}
1066
1067// UnmarshalBinary unmarshals data into e.
1068func (e *LogEntry) UnmarshalBinary(data []byte) error {
1069	var sz uint64
1070	var n int
1071	var seriesID uint64
1072	var err error
1073
1074	orig := data
1075	start := len(data)
1076
1077	// Parse flag data.
1078	if len(data) < 1 {
1079		return io.ErrShortBuffer
1080	}
1081	e.Flag, data = data[0], data[1:]
1082
1083	// Parse series id.
1084	if seriesID, n, err = uvarint(data); err != nil {
1085		return err
1086	}
1087	e.SeriesID, data = seriesID, data[n:]
1088
1089	// Parse name length.
1090	if sz, n, err = uvarint(data); err != nil {
1091		return err
1092	}
1093
1094	// Read name data.
1095	if len(data) < n+int(sz) {
1096		return io.ErrShortBuffer
1097	}
1098	e.Name, data = data[n:n+int(sz)], data[n+int(sz):]
1099
1100	// Parse key length.
1101	if sz, n, err = uvarint(data); err != nil {
1102		return err
1103	}
1104
1105	// Read key data.
1106	if len(data) < n+int(sz) {
1107		return io.ErrShortBuffer
1108	}
1109	e.Key, data = data[n:n+int(sz)], data[n+int(sz):]
1110
1111	// Parse value length.
1112	if sz, n, err = uvarint(data); err != nil {
1113		return err
1114	}
1115
1116	// Read value data.
1117	if len(data) < n+int(sz) {
1118		return io.ErrShortBuffer
1119	}
1120	e.Value, data = data[n:n+int(sz)], data[n+int(sz):]
1121
1122	// Compute checksum.
1123	chk := crc32.ChecksumIEEE(orig[:start-len(data)])
1124
1125	// Parse checksum.
1126	if len(data) < 4 {
1127		return io.ErrShortBuffer
1128	}
1129	e.Checksum, data = binary.BigEndian.Uint32(data[:4]), data[4:]
1130
1131	// Verify checksum.
1132	if chk != e.Checksum {
1133		return ErrLogEntryChecksumMismatch
1134	}
1135
1136	// Save length of elem.
1137	e.Size = start - len(data)
1138
1139	return nil
1140}
1141
1142// appendLogEntry appends to dst and returns the new buffer.
1143// This updates the checksum on the entry.
1144func appendLogEntry(dst []byte, e *LogEntry) []byte {
1145	var buf [binary.MaxVarintLen64]byte
1146	start := len(dst)
1147
1148	// Append flag.
1149	dst = append(dst, e.Flag)
1150
1151	// Append series id.
1152	n := binary.PutUvarint(buf[:], uint64(e.SeriesID))
1153	dst = append(dst, buf[:n]...)
1154
1155	// Append name.
1156	n = binary.PutUvarint(buf[:], uint64(len(e.Name)))
1157	dst = append(dst, buf[:n]...)
1158	dst = append(dst, e.Name...)
1159
1160	// Append key.
1161	n = binary.PutUvarint(buf[:], uint64(len(e.Key)))
1162	dst = append(dst, buf[:n]...)
1163	dst = append(dst, e.Key...)
1164
1165	// Append value.
1166	n = binary.PutUvarint(buf[:], uint64(len(e.Value)))
1167	dst = append(dst, buf[:n]...)
1168	dst = append(dst, e.Value...)
1169
1170	// Calculate checksum.
1171	e.Checksum = crc32.ChecksumIEEE(dst[start:])
1172
1173	// Append checksum.
1174	binary.BigEndian.PutUint32(buf[:4], e.Checksum)
1175	dst = append(dst, buf[:4]...)
1176
1177	return dst
1178}
1179
1180// logMeasurements represents a map of measurement names to measurements.
1181type logMeasurements map[string]*logMeasurement
1182
1183// bytes estimates the memory footprint of this logMeasurements, in bytes.
1184func (mms *logMeasurements) bytes() int {
1185	var b int
1186	for k, v := range *mms {
1187		b += len(k)
1188		b += v.bytes()
1189	}
1190	b += int(unsafe.Sizeof(*mms))
1191	return b
1192}
1193
1194type logMeasurement struct {
1195	name      []byte
1196	tagSet    map[string]logTagKey
1197	deleted   bool
1198	series    map[uint64]struct{}
1199	seriesSet *tsdb.SeriesIDSet
1200}
1201
1202// bytes estimates the memory footprint of this logMeasurement, in bytes.
1203func (m *logMeasurement) bytes() int {
1204	var b int
1205	b += len(m.name)
1206	for k, v := range m.tagSet {
1207		b += len(k)
1208		b += v.bytes()
1209	}
1210	b += (int(m.cardinality()) * 8)
1211	b += int(unsafe.Sizeof(*m))
1212	return b
1213}
1214
1215func (m *logMeasurement) addSeriesID(x uint64) {
1216	if m.seriesSet != nil {
1217		m.seriesSet.AddNoLock(x)
1218		return
1219	}
1220
1221	m.series[x] = struct{}{}
1222
1223	// If the map is getting too big it can be converted into a roaring seriesSet.
1224	if len(m.series) > 25 {
1225		m.seriesSet = tsdb.NewSeriesIDSet()
1226		for id := range m.series {
1227			m.seriesSet.AddNoLock(id)
1228		}
1229		m.series = nil
1230	}
1231}
1232
1233func (m *logMeasurement) removeSeriesID(x uint64) {
1234	if m.seriesSet != nil {
1235		m.seriesSet.RemoveNoLock(x)
1236		return
1237	}
1238	delete(m.series, x)
1239}
1240
1241func (m *logMeasurement) cardinality() int64 {
1242	if m.seriesSet != nil {
1243		return int64(m.seriesSet.Cardinality())
1244	}
1245	return int64(len(m.series))
1246}
1247
1248// forEach applies fn to every series ID in the logMeasurement.
1249func (m *logMeasurement) forEach(fn func(uint64)) {
1250	if m.seriesSet != nil {
1251		m.seriesSet.ForEachNoLock(fn)
1252		return
1253	}
1254
1255	for seriesID := range m.series {
1256		fn(seriesID)
1257	}
1258}
1259
1260// seriesIDs returns a sorted set of seriesIDs.
1261func (m *logMeasurement) seriesIDs() []uint64 {
1262	a := make([]uint64, 0, m.cardinality())
1263	if m.seriesSet != nil {
1264		m.seriesSet.ForEachNoLock(func(id uint64) { a = append(a, id) })
1265		return a // IDs are already sorted.
1266	}
1267
1268	for seriesID := range m.series {
1269		a = append(a, seriesID)
1270	}
1271	sort.Sort(uint64Slice(a))
1272	return a
1273}
1274
1275// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new
1276// one
1277func (m *logMeasurement) seriesIDSet() *tsdb.SeriesIDSet {
1278	if m.seriesSet != nil {
1279		return m.seriesSet.CloneNoLock()
1280	}
1281
1282	ss := tsdb.NewSeriesIDSet()
1283	for seriesID := range m.series {
1284		ss.AddNoLock(seriesID)
1285	}
1286	return ss
1287}
1288
1289func (m *logMeasurement) Name() []byte  { return m.name }
1290func (m *logMeasurement) Deleted() bool { return m.deleted }
1291
1292func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey {
1293	ts, ok := m.tagSet[string(key)]
1294	if !ok {
1295		ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)}
1296	}
1297	return ts
1298}
1299
1300// keys returns a sorted list of tag keys.
1301func (m *logMeasurement) keys() []string {
1302	a := make([]string, 0, len(m.tagSet))
1303	for k := range m.tagSet {
1304		a = append(a, k)
1305	}
1306	sort.Strings(a)
1307	return a
1308}
1309
1310// logMeasurementSlice is a sortable list of log measurements.
1311type logMeasurementSlice []logMeasurement
1312
1313func (a logMeasurementSlice) Len() int           { return len(a) }
1314func (a logMeasurementSlice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
1315func (a logMeasurementSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
1316
1317// logMeasurementIterator represents an iterator over a slice of measurements.
1318type logMeasurementIterator struct {
1319	mms []logMeasurement
1320}
1321
1322// Next returns the next element in the iterator.
1323func (itr *logMeasurementIterator) Next() (e MeasurementElem) {
1324	if len(itr.mms) == 0 {
1325		return nil
1326	}
1327	e, itr.mms = &itr.mms[0], itr.mms[1:]
1328	return e
1329}
1330
1331type logTagKey struct {
1332	name      []byte
1333	deleted   bool
1334	tagValues map[string]logTagValue
1335}
1336
1337// bytes estimates the memory footprint of this logTagKey, in bytes.
1338func (tk *logTagKey) bytes() int {
1339	var b int
1340	b += len(tk.name)
1341	for k, v := range tk.tagValues {
1342		b += len(k)
1343		b += v.bytes()
1344	}
1345	b += int(unsafe.Sizeof(*tk))
1346	return b
1347}
1348
1349func (tk *logTagKey) Key() []byte   { return tk.name }
1350func (tk *logTagKey) Deleted() bool { return tk.deleted }
1351
1352func (tk *logTagKey) TagValueIterator() TagValueIterator {
1353	a := make([]logTagValue, 0, len(tk.tagValues))
1354	for _, v := range tk.tagValues {
1355		a = append(a, v)
1356	}
1357	return newLogTagValueIterator(a)
1358}
1359
1360func (tk *logTagKey) createTagValueIfNotExists(value []byte) logTagValue {
1361	tv, ok := tk.tagValues[string(value)]
1362	if !ok {
1363		tv = logTagValue{name: value, series: make(map[uint64]struct{})}
1364	}
1365	return tv
1366}
1367
1368// logTagKey is a sortable list of log tag keys.
1369type logTagKeySlice []logTagKey
1370
1371func (a logTagKeySlice) Len() int           { return len(a) }
1372func (a logTagKeySlice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
1373func (a logTagKeySlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
1374
1375type logTagValue struct {
1376	name      []byte
1377	deleted   bool
1378	series    map[uint64]struct{}
1379	seriesSet *tsdb.SeriesIDSet
1380}
1381
1382// bytes estimates the memory footprint of this logTagValue, in bytes.
1383func (tv *logTagValue) bytes() int {
1384	var b int
1385	b += len(tv.name)
1386	b += int(unsafe.Sizeof(*tv))
1387	b += (int(tv.cardinality()) * 8)
1388	return b
1389}
1390
1391func (tv *logTagValue) addSeriesID(x uint64) {
1392	if tv.seriesSet != nil {
1393		tv.seriesSet.AddNoLock(x)
1394		return
1395	}
1396
1397	tv.series[x] = struct{}{}
1398
1399	// If the map is getting too big it can be converted into a roaring seriesSet.
1400	if len(tv.series) > 25 {
1401		tv.seriesSet = tsdb.NewSeriesIDSet()
1402		for id := range tv.series {
1403			tv.seriesSet.AddNoLock(id)
1404		}
1405		tv.series = nil
1406	}
1407}
1408
1409func (tv *logTagValue) removeSeriesID(x uint64) {
1410	if tv.seriesSet != nil {
1411		tv.seriesSet.RemoveNoLock(x)
1412		return
1413	}
1414	delete(tv.series, x)
1415}
1416
1417func (tv *logTagValue) cardinality() int64 {
1418	if tv.seriesSet != nil {
1419		return int64(tv.seriesSet.Cardinality())
1420	}
1421	return int64(len(tv.series))
1422}
1423
1424// seriesIDs returns a sorted set of seriesIDs.
1425func (tv *logTagValue) seriesIDs() []uint64 {
1426	a := make([]uint64, 0, tv.cardinality())
1427	if tv.seriesSet != nil {
1428		tv.seriesSet.ForEachNoLock(func(id uint64) { a = append(a, id) })
1429		return a // IDs are already sorted.
1430	}
1431
1432	for seriesID := range tv.series {
1433		a = append(a, seriesID)
1434	}
1435	sort.Sort(uint64Slice(a))
1436	return a
1437}
1438
1439// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new
1440// one
1441func (tv *logTagValue) seriesIDSet() *tsdb.SeriesIDSet {
1442	if tv.seriesSet != nil {
1443		return tv.seriesSet.CloneNoLock()
1444	}
1445
1446	ss := tsdb.NewSeriesIDSet()
1447	for seriesID := range tv.series {
1448		ss.AddNoLock(seriesID)
1449	}
1450	return ss
1451}
1452
1453func (tv *logTagValue) Value() []byte { return tv.name }
1454func (tv *logTagValue) Deleted() bool { return tv.deleted }
1455
1456// logTagValue is a sortable list of log tag values.
1457type logTagValueSlice []logTagValue
1458
1459func (a logTagValueSlice) Len() int           { return len(a) }
1460func (a logTagValueSlice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
1461func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
1462
1463// logTagKeyIterator represents an iterator over a slice of tag keys.
1464type logTagKeyIterator struct {
1465	a []logTagKey
1466}
1467
1468// newLogTagKeyIterator returns a new instance of logTagKeyIterator.
1469func newLogTagKeyIterator(a []logTagKey) *logTagKeyIterator {
1470	sort.Sort(logTagKeySlice(a))
1471	return &logTagKeyIterator{a: a}
1472}
1473
1474// Next returns the next element in the iterator.
1475func (itr *logTagKeyIterator) Next() (e TagKeyElem) {
1476	if len(itr.a) == 0 {
1477		return nil
1478	}
1479	e, itr.a = &itr.a[0], itr.a[1:]
1480	return e
1481}
1482
1483// logTagValueIterator represents an iterator over a slice of tag values.
1484type logTagValueIterator struct {
1485	a []logTagValue
1486}
1487
1488// newLogTagValueIterator returns a new instance of logTagValueIterator.
1489func newLogTagValueIterator(a []logTagValue) *logTagValueIterator {
1490	sort.Sort(logTagValueSlice(a))
1491	return &logTagValueIterator{a: a}
1492}
1493
1494// Next returns the next element in the iterator.
1495func (itr *logTagValueIterator) Next() (e TagValueElem) {
1496	if len(itr.a) == 0 {
1497		return nil
1498	}
1499	e, itr.a = &itr.a[0], itr.a[1:]
1500	return e
1501}
1502
1503// FormatLogFileName generates a log filename for the given index.
1504func FormatLogFileName(id int) string {
1505	return fmt.Sprintf("L0-%08d%s", id, LogFileExt)
1506}
1507