1package tsi1
2
3import (
4	"bufio"
5	"bytes"
6	"encoding/binary"
7	"errors"
8	"fmt"
9	"hash/crc32"
10	"io"
11	"os"
12	"sort"
13	"sync"
14	"time"
15	"unsafe"
16
17	"github.com/influxdata/influxdb/models"
18	"github.com/influxdata/influxdb/pkg/bloom"
19	"github.com/influxdata/influxdb/pkg/estimator"
20	"github.com/influxdata/influxdb/pkg/estimator/hll"
21	"github.com/influxdata/influxdb/pkg/mmap"
22	"github.com/influxdata/influxdb/tsdb"
23)
24
25// Log errors.
26var (
27	ErrLogEntryChecksumMismatch = errors.New("log entry checksum mismatch")
28)
29
30// Log entry flag constants.
31const (
32	LogEntrySeriesTombstoneFlag      = 0x01
33	LogEntryMeasurementTombstoneFlag = 0x02
34	LogEntryTagKeyTombstoneFlag      = 0x04
35	LogEntryTagValueTombstoneFlag    = 0x08
36)
37
38// defaultLogFileBufferSize describes the size of the buffer that the LogFile's buffered
39// writer uses. If the LogFile does not have an explicit buffer size set then
40// this is the size of the buffer; it is equal to the default buffer size used
41// by a bufio.Writer.
42const defaultLogFileBufferSize = 4096
43
44// indexFileBufferSize is the buffer size used when compacting the LogFile down
45// into a .tsi file.
46const indexFileBufferSize = 1 << 17 // 128K
47
48// LogFile represents an on-disk write-ahead log file.
49type LogFile struct {
50	mu         sync.RWMutex
51	wg         sync.WaitGroup // ref count
52	id         int            // file sequence identifier
53	data       []byte         // mmap
54	file       *os.File       // writer
55	w          *bufio.Writer  // buffered writer
56	bufferSize int            // The size of the buffer used by the buffered writer
57	nosync     bool           // Disables buffer flushing and file syncing. Useful for offline tooling.
58	buf        []byte         // marshaling buffer
59	keyBuf     []byte
60
61	sfile   *tsdb.SeriesFile // series lookup
62	size    int64            // tracks current file size
63	modTime time.Time        // tracks last time write occurred
64
65	// In-memory series existence/tombstone sets.
66	seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet
67
68	// In-memory index.
69	mms logMeasurements
70
71	// Filepath to the log file.
72	path string
73}
74
75// NewLogFile returns a new instance of LogFile.
76func NewLogFile(sfile *tsdb.SeriesFile, path string) *LogFile {
77	return &LogFile{
78		sfile: sfile,
79		path:  path,
80		mms:   make(logMeasurements),
81
82		seriesIDSet:          tsdb.NewSeriesIDSet(),
83		tombstoneSeriesIDSet: tsdb.NewSeriesIDSet(),
84	}
85}
86
87// bytes estimates the memory footprint of this LogFile, in bytes.
88func (f *LogFile) bytes() int {
89	var b int
90	b += 24 // mu RWMutex is 24 bytes
91	b += 16 // wg WaitGroup is 16 bytes
92	b += int(unsafe.Sizeof(f.id))
93	// Do not include f.data because it is mmap'd
94	// TODO(jacobmarble): Uncomment when we are using go >= 1.10.0
95	//b += int(unsafe.Sizeof(f.w)) + f.w.Size()
96	b += int(unsafe.Sizeof(f.buf)) + len(f.buf)
97	b += int(unsafe.Sizeof(f.keyBuf)) + len(f.keyBuf)
98	// Do not count SeriesFile because it belongs to the code that constructed this Index.
99	b += int(unsafe.Sizeof(f.size))
100	b += int(unsafe.Sizeof(f.modTime))
101	b += int(unsafe.Sizeof(f.seriesIDSet)) + f.seriesIDSet.Bytes()
102	b += int(unsafe.Sizeof(f.tombstoneSeriesIDSet)) + f.tombstoneSeriesIDSet.Bytes()
103	b += int(unsafe.Sizeof(f.mms)) + f.mms.bytes()
104	b += int(unsafe.Sizeof(f.path)) + len(f.path)
105	return b
106}
107
108// Open reads the log from a file and validates all the checksums.
109func (f *LogFile) Open() error {
110	if err := f.open(); err != nil {
111		f.Close()
112		return err
113	}
114	return nil
115}
116
117func (f *LogFile) open() error {
118	f.id, _ = ParseFilename(f.path)
119
120	// Open file for appending.
121	file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_CREATE, 0666)
122	if err != nil {
123		return err
124	}
125	f.file = file
126
127	if f.bufferSize == 0 {
128		f.bufferSize = defaultLogFileBufferSize
129	}
130	f.w = bufio.NewWriterSize(f.file, f.bufferSize)
131
132	// Finish opening if file is empty.
133	fi, err := file.Stat()
134	if err != nil {
135		return err
136	} else if fi.Size() == 0 {
137		return nil
138	}
139	f.size = fi.Size()
140	f.modTime = fi.ModTime()
141
142	// Open a read-only memory map of the existing data.
143	data, err := mmap.Map(f.Path(), 0)
144	if err != nil {
145		return err
146	}
147	f.data = data
148
149	// Read log entries from mmap.
150	var n int64
151	for buf := f.data; len(buf) > 0; {
152		// Read next entry. Truncate partial writes.
153		var e LogEntry
154		if err := e.UnmarshalBinary(buf); err == io.ErrShortBuffer || err == ErrLogEntryChecksumMismatch {
155			break
156		} else if err != nil {
157			return err
158		}
159
160		// Execute entry against in-memory index.
161		f.execEntry(&e)
162
163		// Move buffer forward.
164		n += int64(e.Size)
165		buf = buf[e.Size:]
166	}
167
168	// Move to the end of the file.
169	f.size = n
170	_, err = file.Seek(n, io.SeekStart)
171	return err
172}
173
174// Close shuts down the file handle and mmap.
175func (f *LogFile) Close() error {
176	// Wait until the file has no more references.
177	f.wg.Wait()
178
179	if f.w != nil {
180		f.w.Flush()
181		f.w = nil
182	}
183
184	if f.file != nil {
185		f.file.Close()
186		f.file = nil
187	}
188
189	if f.data != nil {
190		mmap.Unmap(f.data)
191	}
192
193	f.mms = make(logMeasurements)
194	return nil
195}
196
197// FlushAndSync flushes buffered data to disk and then fsyncs the underlying file.
198// If the LogFile has disabled flushing and syncing then FlushAndSync is a no-op.
199func (f *LogFile) FlushAndSync() error {
200	if f.nosync {
201		return nil
202	}
203
204	if f.w != nil {
205		if err := f.w.Flush(); err != nil {
206			return err
207		}
208	}
209
210	if f.file == nil {
211		return nil
212	}
213	return f.file.Sync()
214}
215
216// ID returns the file sequence identifier.
217func (f *LogFile) ID() int { return f.id }
218
219// Path returns the file path.
220func (f *LogFile) Path() string { return f.path }
221
222// SetPath sets the log file's path.
223func (f *LogFile) SetPath(path string) { f.path = path }
224
225// Level returns the log level of the file.
226func (f *LogFile) Level() int { return 0 }
227
228// Filter returns the bloom filter for the file.
229func (f *LogFile) Filter() *bloom.Filter { return nil }
230
231// Retain adds a reference count to the file.
232func (f *LogFile) Retain() { f.wg.Add(1) }
233
234// Release removes a reference count from the file.
235func (f *LogFile) Release() { f.wg.Done() }
236
237// Stat returns size and last modification time of the file.
238func (f *LogFile) Stat() (int64, time.Time) {
239	f.mu.RLock()
240	size, modTime := f.size, f.modTime
241	f.mu.RUnlock()
242	return size, modTime
243}
244
245// SeriesIDSet returns the series existence set.
246func (f *LogFile) SeriesIDSet() (*tsdb.SeriesIDSet, error) {
247	return f.seriesIDSet, nil
248}
249
250// TombstoneSeriesIDSet returns the series tombstone set.
251func (f *LogFile) TombstoneSeriesIDSet() (*tsdb.SeriesIDSet, error) {
252	return f.tombstoneSeriesIDSet, nil
253}
254
255// Size returns the size of the file, in bytes.
256func (f *LogFile) Size() int64 {
257	f.mu.RLock()
258	v := f.size
259	f.mu.RUnlock()
260	return v
261}
262
263// Measurement returns a measurement element.
264func (f *LogFile) Measurement(name []byte) MeasurementElem {
265	f.mu.RLock()
266	defer f.mu.RUnlock()
267
268	mm, ok := f.mms[string(name)]
269	if !ok {
270		return nil
271	}
272
273	return mm
274}
275
276func (f *LogFile) MeasurementHasSeries(ss *tsdb.SeriesIDSet, name []byte) bool {
277	f.mu.RLock()
278	defer f.mu.RUnlock()
279
280	mm, ok := f.mms[string(name)]
281	if !ok {
282		return false
283	}
284
285	// TODO(edd): if mm is using a seriesSet then this could be changed to do a fast intersection.
286	for _, id := range mm.seriesIDs() {
287		if ss.Contains(id) {
288			return true
289		}
290	}
291	return false
292}
293
294// MeasurementNames returns an ordered list of measurement names.
295func (f *LogFile) MeasurementNames() []string {
296	f.mu.RLock()
297	defer f.mu.RUnlock()
298	return f.measurementNames()
299}
300
301func (f *LogFile) measurementNames() []string {
302	a := make([]string, 0, len(f.mms))
303	for name := range f.mms {
304		a = append(a, name)
305	}
306	sort.Strings(a)
307	return a
308}
309
310// DeleteMeasurement adds a tombstone for a measurement to the log file.
311func (f *LogFile) DeleteMeasurement(name []byte) error {
312	f.mu.Lock()
313	defer f.mu.Unlock()
314
315	e := LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name}
316	if err := f.appendEntry(&e); err != nil {
317		return err
318	}
319	f.execEntry(&e)
320
321	// Flush buffer and sync to disk.
322	return f.FlushAndSync()
323}
324
325// TagKeySeriesIDIterator returns a series iterator for a tag key.
326func (f *LogFile) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator, error) {
327	f.mu.RLock()
328	defer f.mu.RUnlock()
329
330	mm, ok := f.mms[string(name)]
331	if !ok {
332		return nil, nil
333	}
334
335	tk, ok := mm.tagSet[string(key)]
336	if !ok {
337		return nil, nil
338	}
339
340	// Combine iterators across all tag keys.
341	itrs := make([]tsdb.SeriesIDIterator, 0, len(tk.tagValues))
342	for _, tv := range tk.tagValues {
343		if tv.cardinality() == 0 {
344			continue
345		}
346		if itr := tsdb.NewSeriesIDSetIterator(tv.seriesIDSet()); itr != nil {
347			itrs = append(itrs, itr)
348		}
349	}
350
351	return tsdb.MergeSeriesIDIterators(itrs...), nil
352}
353
354// TagKeyIterator returns a value iterator for a measurement.
355func (f *LogFile) TagKeyIterator(name []byte) TagKeyIterator {
356	f.mu.RLock()
357	defer f.mu.RUnlock()
358
359	mm, ok := f.mms[string(name)]
360	if !ok {
361		return nil
362	}
363
364	a := make([]logTagKey, 0, len(mm.tagSet))
365	for _, k := range mm.tagSet {
366		a = append(a, k)
367	}
368	return newLogTagKeyIterator(a)
369}
370
371// TagKey returns a tag key element.
372func (f *LogFile) TagKey(name, key []byte) TagKeyElem {
373	f.mu.RLock()
374	defer f.mu.RUnlock()
375
376	mm, ok := f.mms[string(name)]
377	if !ok {
378		return nil
379	}
380
381	tk, ok := mm.tagSet[string(key)]
382	if !ok {
383		return nil
384	}
385
386	return &tk
387}
388
389// TagValue returns a tag value element.
390func (f *LogFile) TagValue(name, key, value []byte) TagValueElem {
391	f.mu.RLock()
392	defer f.mu.RUnlock()
393
394	mm, ok := f.mms[string(name)]
395	if !ok {
396		return nil
397	}
398
399	tk, ok := mm.tagSet[string(key)]
400	if !ok {
401		return nil
402	}
403
404	tv, ok := tk.tagValues[string(value)]
405	if !ok {
406		return nil
407	}
408
409	return &tv
410}
411
412// TagValueIterator returns a value iterator for a tag key.
413func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator {
414	f.mu.RLock()
415	defer f.mu.RUnlock()
416
417	mm, ok := f.mms[string(name)]
418	if !ok {
419		return nil
420	}
421
422	tk, ok := mm.tagSet[string(key)]
423	if !ok {
424		return nil
425	}
426	return tk.TagValueIterator()
427}
428
429// DeleteTagKey adds a tombstone for a tag key to the log file.
430func (f *LogFile) DeleteTagKey(name, key []byte) error {
431	f.mu.Lock()
432	defer f.mu.Unlock()
433
434	e := LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: key}
435	if err := f.appendEntry(&e); err != nil {
436		return err
437	}
438	f.execEntry(&e)
439
440	// Flush buffer and sync to disk.
441	return f.FlushAndSync()
442}
443
444// TagValueSeriesIDSet returns a series iterator for a tag value.
445func (f *LogFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) {
446	f.mu.RLock()
447	defer f.mu.RUnlock()
448
449	mm, ok := f.mms[string(name)]
450	if !ok {
451		return nil, nil
452	}
453
454	tk, ok := mm.tagSet[string(key)]
455	if !ok {
456		return nil, nil
457	}
458
459	tv, ok := tk.tagValues[string(value)]
460	if !ok {
461		return nil, nil
462	} else if tv.cardinality() == 0 {
463		return nil, nil
464	}
465
466	return tv.seriesIDSet(), nil
467}
468
469// MeasurementN returns the total number of measurements.
470func (f *LogFile) MeasurementN() (n uint64) {
471	f.mu.RLock()
472	defer f.mu.RUnlock()
473	return uint64(len(f.mms))
474}
475
476// TagKeyN returns the total number of keys.
477func (f *LogFile) TagKeyN() (n uint64) {
478	f.mu.RLock()
479	defer f.mu.RUnlock()
480	for _, mm := range f.mms {
481		n += uint64(len(mm.tagSet))
482	}
483	return n
484}
485
486// TagValueN returns the total number of values.
487func (f *LogFile) TagValueN() (n uint64) {
488	f.mu.RLock()
489	defer f.mu.RUnlock()
490	for _, mm := range f.mms {
491		for _, k := range mm.tagSet {
492			n += uint64(len(k.tagValues))
493		}
494	}
495	return n
496}
497
498// DeleteTagValue adds a tombstone for a tag value to the log file.
499func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
500	f.mu.Lock()
501	defer f.mu.Unlock()
502
503	e := LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: key, Value: value}
504	if err := f.appendEntry(&e); err != nil {
505		return err
506	}
507	f.execEntry(&e)
508
509	// Flush buffer and sync to disk.
510	return f.FlushAndSync()
511}
512
513// AddSeriesList adds a list of series to the log file in bulk.
514func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags, tracker tsdb.StatsTracker) ([]uint64, error) {
515	seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice, tracker)
516	if err != nil {
517		return nil, err
518	}
519
520	var writeRequired bool
521	entries := make([]LogEntry, 0, len(names))
522	seriesSet.RLock()
523	for i := range names {
524		if seriesSet.ContainsNoLock(seriesIDs[i]) {
525			// We don't need to allocate anything for this series.
526			seriesIDs[i] = 0
527			continue
528		}
529		writeRequired = true
530		entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true, batchidx: i})
531	}
532	seriesSet.RUnlock()
533
534	// Exit if all series already exist.
535	if !writeRequired {
536		return seriesIDs, nil
537	}
538
539	f.mu.Lock()
540	defer f.mu.Unlock()
541
542	seriesSet.Lock()
543	defer seriesSet.Unlock()
544
545	for i := range entries { // NB - this doesn't evaluate all series ids returned from series file.
546		entry := &entries[i]
547		if seriesSet.ContainsNoLock(entry.SeriesID) {
548			// We don't need to allocate anything for this series.
549			seriesIDs[entry.batchidx] = 0
550			continue
551		}
552		if err := f.appendEntry(entry); err != nil {
553			return nil, err
554		}
555		f.execEntry(entry)
556		seriesSet.AddNoLock(entry.SeriesID)
557	}
558
559	// Flush buffer and sync to disk.
560	if err := f.FlushAndSync(); err != nil {
561		return nil, err
562	}
563	return seriesIDs, nil
564}
565
566// DeleteSeriesID adds a tombstone for a series id.
567func (f *LogFile) DeleteSeriesID(id uint64) error {
568	f.mu.Lock()
569	defer f.mu.Unlock()
570
571	e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id}
572	if err := f.appendEntry(&e); err != nil {
573		return err
574	}
575	f.execEntry(&e)
576
577	// Flush buffer and sync to disk.
578	return f.FlushAndSync()
579}
580
581// DeleteSeriesIDList adds a tombstone for seriesIDList
582func (f *LogFile) DeleteSeriesIDList(ids []uint64) error {
583	f.mu.Lock()
584	defer f.mu.Unlock()
585
586	for _, id := range ids {
587		e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id}
588		if err := f.appendEntry(&e); err != nil {
589			return err
590		}
591		f.execEntry(&e)
592	}
593
594	// Flush buffer and sync to disk.
595	return f.FlushAndSync()
596}
597
598// SeriesN returns the total number of series in the file.
599func (f *LogFile) SeriesN() (n uint64) {
600	f.mu.RLock()
601	defer f.mu.RUnlock()
602
603	for _, mm := range f.mms {
604		n += uint64(mm.cardinality())
605	}
606	return n
607}
608
609// appendEntry adds a log entry to the end of the file.
610func (f *LogFile) appendEntry(e *LogEntry) error {
611	// Marshal entry to the local buffer.
612	f.buf = appendLogEntry(f.buf[:0], e)
613
614	// Save the size of the record.
615	e.Size = len(f.buf)
616
617	// Write record to file.
618	n, err := f.w.Write(f.buf)
619	if err != nil {
620		// Move position backwards over partial entry.
621		// Log should be reopened if seeking cannot be completed.
622		if n > 0 {
623			f.w.Reset(f.file)
624			if _, err := f.file.Seek(int64(-n), io.SeekCurrent); err != nil {
625				f.Close()
626			}
627		}
628		return err
629	}
630
631	// Update in-memory file size & modification time.
632	f.size += int64(n)
633	f.modTime = time.Now()
634
635	return nil
636}
637
638// execEntry executes a log entry against the in-memory index.
639// This is done after appending and on replay of the log.
640func (f *LogFile) execEntry(e *LogEntry) {
641	switch e.Flag {
642	case LogEntryMeasurementTombstoneFlag:
643		f.execDeleteMeasurementEntry(e)
644	case LogEntryTagKeyTombstoneFlag:
645		f.execDeleteTagKeyEntry(e)
646	case LogEntryTagValueTombstoneFlag:
647		f.execDeleteTagValueEntry(e)
648	default:
649		f.execSeriesEntry(e)
650	}
651}
652
653func (f *LogFile) execDeleteMeasurementEntry(e *LogEntry) {
654	mm := f.createMeasurementIfNotExists(e.Name)
655	mm.deleted = true
656	mm.tagSet = make(map[string]logTagKey)
657	mm.series = make(map[uint64]struct{})
658	mm.seriesSet = nil
659}
660
661func (f *LogFile) execDeleteTagKeyEntry(e *LogEntry) {
662	mm := f.createMeasurementIfNotExists(e.Name)
663	ts := mm.createTagSetIfNotExists(e.Key)
664
665	ts.deleted = true
666
667	mm.tagSet[string(e.Key)] = ts
668}
669
670func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) {
671	mm := f.createMeasurementIfNotExists(e.Name)
672	ts := mm.createTagSetIfNotExists(e.Key)
673	tv := ts.createTagValueIfNotExists(e.Value)
674
675	tv.deleted = true
676
677	ts.tagValues[string(e.Value)] = tv
678	mm.tagSet[string(e.Key)] = ts
679}
680
681func (f *LogFile) execSeriesEntry(e *LogEntry) {
682	var seriesKey []byte
683	if e.cached {
684		sz := tsdb.SeriesKeySize(e.name, e.tags)
685		if len(f.keyBuf) < sz {
686			f.keyBuf = make([]byte, 0, sz)
687		}
688		seriesKey = tsdb.AppendSeriesKey(f.keyBuf[:0], e.name, e.tags)
689	} else {
690		seriesKey = f.sfile.SeriesKey(e.SeriesID)
691	}
692
693	// Series keys can be removed if the series has been deleted from
694	// the entire database and the server is restarted. This would cause
695	// the log to replay its insert but the key cannot be found.
696	//
697	// https://github.com/influxdata/influxdb/issues/9444
698	if seriesKey == nil {
699		return
700	}
701
702	// Check if deleted.
703	deleted := e.Flag == LogEntrySeriesTombstoneFlag
704
705	// Read key size.
706	_, remainder := tsdb.ReadSeriesKeyLen(seriesKey)
707
708	// Read measurement name.
709	name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)
710	mm := f.createMeasurementIfNotExists(name)
711	mm.deleted = false
712	if !deleted {
713		mm.addSeriesID(e.SeriesID)
714	} else {
715		mm.removeSeriesID(e.SeriesID)
716	}
717
718	// Read tag count.
719	tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder)
720
721	// Save tags.
722	var k, v []byte
723	for i := 0; i < tagN; i++ {
724		k, v, remainder = tsdb.ReadSeriesKeyTag(remainder)
725		ts := mm.createTagSetIfNotExists(k)
726		tv := ts.createTagValueIfNotExists(v)
727
728		// Add/remove a reference to the series on the tag value.
729		if !deleted {
730			tv.addSeriesID(e.SeriesID)
731		} else {
732			tv.removeSeriesID(e.SeriesID)
733		}
734
735		ts.tagValues[string(v)] = tv
736		mm.tagSet[string(k)] = ts
737	}
738
739	// Add/remove from appropriate series id sets.
740	if !deleted {
741		f.seriesIDSet.Add(e.SeriesID)
742		f.tombstoneSeriesIDSet.Remove(e.SeriesID)
743	} else {
744		f.seriesIDSet.Remove(e.SeriesID)
745		f.tombstoneSeriesIDSet.Add(e.SeriesID)
746	}
747}
748
749// SeriesIDIterator returns an iterator over all series in the log file.
750func (f *LogFile) SeriesIDIterator() tsdb.SeriesIDIterator {
751	f.mu.RLock()
752	defer f.mu.RUnlock()
753
754	ss := tsdb.NewSeriesIDSet()
755	allSeriesSets := make([]*tsdb.SeriesIDSet, 0, len(f.mms))
756
757	for _, mm := range f.mms {
758		if mm.seriesSet != nil {
759			allSeriesSets = append(allSeriesSets, mm.seriesSet)
760			continue
761		}
762
763		// measurement is not using seriesSet to store series IDs.
764		mm.forEach(func(seriesID uint64) {
765			ss.AddNoLock(seriesID)
766		})
767	}
768
769	// Fast merge all seriesSets.
770	if len(allSeriesSets) > 0 {
771		ss.Merge(allSeriesSets...)
772	}
773
774	return tsdb.NewSeriesIDSetIterator(ss)
775}
776
777// createMeasurementIfNotExists returns a measurement by name.
778func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement {
779	mm := f.mms[string(name)]
780	if mm == nil {
781		mm = &logMeasurement{
782			name:   name,
783			tagSet: make(map[string]logTagKey),
784			series: make(map[uint64]struct{}),
785		}
786		f.mms[string(name)] = mm
787	}
788	return mm
789}
790
791// MeasurementIterator returns an iterator over all the measurements in the file.
792func (f *LogFile) MeasurementIterator() MeasurementIterator {
793	f.mu.RLock()
794	defer f.mu.RUnlock()
795
796	var itr logMeasurementIterator
797	for _, mm := range f.mms {
798		itr.mms = append(itr.mms, *mm)
799	}
800	sort.Sort(logMeasurementSlice(itr.mms))
801	return &itr
802}
803
804// MeasurementSeriesIDIterator returns an iterator over all series for a measurement.
805func (f *LogFile) MeasurementSeriesIDIterator(name []byte) tsdb.SeriesIDIterator {
806	f.mu.RLock()
807	defer f.mu.RUnlock()
808
809	mm := f.mms[string(name)]
810	if mm == nil || mm.cardinality() == 0 {
811		return nil
812	}
813	return tsdb.NewSeriesIDSetIterator(mm.seriesIDSet())
814}
815
816// CompactTo compacts the log file and writes it to w.
817func (f *LogFile) CompactTo(w io.Writer, m, k uint64, cancel <-chan struct{}) (n int64, err error) {
818	f.mu.RLock()
819	defer f.mu.RUnlock()
820
821	// Check for cancellation.
822	select {
823	case <-cancel:
824		return n, ErrCompactionInterrupted
825	default:
826	}
827
828	// Wrap in bufferred writer with a buffer equivalent to the LogFile size.
829	bw := bufio.NewWriterSize(w, indexFileBufferSize) // 128K
830
831	// Setup compaction offset tracking data.
832	var t IndexFileTrailer
833	info := newLogFileCompactInfo()
834	info.cancel = cancel
835
836	// Write magic number.
837	if err := writeTo(bw, []byte(FileSignature), &n); err != nil {
838		return n, err
839	}
840
841	// Retreve measurement names in order.
842	names := f.measurementNames()
843
844	// Flush buffer & mmap series block.
845	if err := bw.Flush(); err != nil {
846		return n, err
847	}
848
849	// Write tagset blocks in measurement order.
850	if err := f.writeTagsetsTo(bw, names, info, &n); err != nil {
851		return n, err
852	}
853
854	// Write measurement block.
855	t.MeasurementBlock.Offset = n
856	if err := f.writeMeasurementBlockTo(bw, names, info, &n); err != nil {
857		return n, err
858	}
859	t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset
860
861	// Write series set.
862	t.SeriesIDSet.Offset = n
863	nn, err := f.seriesIDSet.WriteTo(bw)
864	if n += nn; err != nil {
865		return n, err
866	}
867	t.SeriesIDSet.Size = n - t.SeriesIDSet.Offset
868
869	// Write tombstone series set.
870	t.TombstoneSeriesIDSet.Offset = n
871	nn, err = f.tombstoneSeriesIDSet.WriteTo(bw)
872	if n += nn; err != nil {
873		return n, err
874	}
875	t.TombstoneSeriesIDSet.Size = n - t.TombstoneSeriesIDSet.Offset
876
877	// Build series sketches.
878	sSketch, sTSketch, err := f.seriesSketches()
879	if err != nil {
880		return n, err
881	}
882
883	// Write series sketches.
884	t.SeriesSketch.Offset = n
885	data, err := sSketch.MarshalBinary()
886	if err != nil {
887		return n, err
888	} else if _, err := bw.Write(data); err != nil {
889		return n, err
890	}
891	t.SeriesSketch.Size = int64(len(data))
892	n += t.SeriesSketch.Size
893
894	t.TombstoneSeriesSketch.Offset = n
895	if data, err = sTSketch.MarshalBinary(); err != nil {
896		return n, err
897	} else if _, err := bw.Write(data); err != nil {
898		return n, err
899	}
900	t.TombstoneSeriesSketch.Size = int64(len(data))
901	n += t.TombstoneSeriesSketch.Size
902
903	// Write trailer.
904	nn, err = t.WriteTo(bw)
905	n += nn
906	if err != nil {
907		return n, err
908	}
909
910	// Flush buffer.
911	if err := bw.Flush(); err != nil {
912		return n, err
913	}
914
915	return n, nil
916}
917
918func (f *LogFile) writeTagsetsTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error {
919	for _, name := range names {
920		if err := f.writeTagsetTo(w, name, info, n); err != nil {
921			return err
922		}
923	}
924	return nil
925}
926
927// writeTagsetTo writes a single tagset to w and saves the tagset offset.
928func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactInfo, n *int64) error {
929	mm := f.mms[name]
930
931	// Check for cancellation.
932	select {
933	case <-info.cancel:
934		return ErrCompactionInterrupted
935	default:
936	}
937
938	enc := NewTagBlockEncoder(w)
939	var valueN int
940	for _, k := range mm.keys() {
941		tag := mm.tagSet[k]
942
943		// Encode tag. Skip values if tag is deleted.
944		if err := enc.EncodeKey(tag.name, tag.deleted); err != nil {
945			return err
946		} else if tag.deleted {
947			continue
948		}
949
950		// Sort tag values.
951		values := make([]string, 0, len(tag.tagValues))
952		for v := range tag.tagValues {
953			values = append(values, v)
954		}
955		sort.Strings(values)
956
957		// Add each value.
958		for _, v := range values {
959			value := tag.tagValues[v]
960			if err := enc.EncodeValue(value.name, value.deleted, value.seriesIDSet()); err != nil {
961				return err
962			}
963
964			// Check for cancellation periodically.
965			if valueN++; valueN%1000 == 0 {
966				select {
967				case <-info.cancel:
968					return ErrCompactionInterrupted
969				default:
970				}
971			}
972		}
973	}
974
975	// Save tagset offset to measurement.
976	offset := *n
977
978	// Flush tag block.
979	err := enc.Close()
980	*n += enc.N()
981	if err != nil {
982		return err
983	}
984
985	// Save tagset offset to measurement.
986	size := *n - offset
987
988	info.mms[name] = &logFileMeasurementCompactInfo{offset: offset, size: size}
989
990	return nil
991}
992
993func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error {
994	mw := NewMeasurementBlockWriter()
995
996	// Check for cancellation.
997	select {
998	case <-info.cancel:
999		return ErrCompactionInterrupted
1000	default:
1001	}
1002
1003	// Add measurement data.
1004	for _, name := range names {
1005		mm := f.mms[name]
1006		mmInfo := info.mms[name]
1007		assert(mmInfo != nil, "measurement info not found")
1008		mw.Add(mm.name, mm.deleted, mmInfo.offset, mmInfo.size, mm.seriesIDs())
1009	}
1010
1011	// Flush data to writer.
1012	nn, err := mw.WriteTo(w)
1013	*n += nn
1014	return err
1015}
1016
1017// logFileCompactInfo is a context object to track compaction position info.
1018type logFileCompactInfo struct {
1019	cancel <-chan struct{}
1020	mms    map[string]*logFileMeasurementCompactInfo
1021}
1022
1023// newLogFileCompactInfo returns a new instance of logFileCompactInfo.
1024func newLogFileCompactInfo() *logFileCompactInfo {
1025	return &logFileCompactInfo{
1026		mms: make(map[string]*logFileMeasurementCompactInfo),
1027	}
1028}
1029
1030type logFileMeasurementCompactInfo struct {
1031	offset int64
1032	size   int64
1033}
1034
1035// MeasurementsSketches returns sketches for existing and tombstoned measurement names.
1036func (f *LogFile) MeasurementsSketches() (sketch, tSketch estimator.Sketch, err error) {
1037	f.mu.RLock()
1038	defer f.mu.RUnlock()
1039	return f.measurementsSketches()
1040}
1041
1042func (f *LogFile) measurementsSketches() (sketch, tSketch estimator.Sketch, err error) {
1043	sketch, tSketch = hll.NewDefaultPlus(), hll.NewDefaultPlus()
1044	for _, mm := range f.mms {
1045		if mm.deleted {
1046			tSketch.Add(mm.name)
1047		} else {
1048			sketch.Add(mm.name)
1049		}
1050	}
1051	return sketch, tSketch, nil
1052}
1053
1054// SeriesSketches returns sketches for existing and tombstoned series.
1055func (f *LogFile) SeriesSketches() (sketch, tSketch estimator.Sketch, err error) {
1056	f.mu.RLock()
1057	defer f.mu.RUnlock()
1058	return f.seriesSketches()
1059}
1060
1061func (f *LogFile) seriesSketches() (sketch, tSketch estimator.Sketch, err error) {
1062	sketch = hll.NewDefaultPlus()
1063	f.seriesIDSet.ForEach(func(id uint64) {
1064		name, keys := f.sfile.Series(id)
1065		sketch.Add(models.MakeKey(name, keys))
1066	})
1067
1068	tSketch = hll.NewDefaultPlus()
1069	f.tombstoneSeriesIDSet.ForEach(func(id uint64) {
1070		name, keys := f.sfile.Series(id)
1071		tSketch.Add(models.MakeKey(name, keys))
1072	})
1073	return sketch, tSketch, nil
1074}
1075
1076func (f *LogFile) ExecEntries(entries []LogEntry) error {
1077	f.mu.Lock()
1078	defer f.mu.Unlock()
1079	for i := range entries {
1080		entry := &entries[i]
1081		if err := f.appendEntry(entry); err != nil {
1082			return err
1083		}
1084		f.execEntry(entry)
1085	}
1086	return nil
1087}
1088
1089func (f *LogFile) Writes(entries []LogEntry) error {
1090	if err := f.ExecEntries(entries); err != nil {
1091		return err
1092	}
1093
1094	// Flush buffer and sync to disk.
1095	return f.FlushAndSync()
1096}
1097
1098// LogEntry represents a single log entry in the write-ahead log.
1099type LogEntry struct {
1100	Flag     byte   // flag
1101	SeriesID uint64 // series id
1102	Name     []byte // measurement name
1103	Key      []byte // tag key
1104	Value    []byte // tag value
1105	Checksum uint32 // checksum of flag/name/tags.
1106	Size     int    // total size of record, in bytes.
1107
1108	cached   bool        // Hint to LogFile that series data is already parsed
1109	name     []byte      // series naem, this is a cached copy of the parsed measurement name
1110	tags     models.Tags // series tags, this is a cached copied of the parsed tags
1111	batchidx int         // position of entry in batch.
1112}
1113
1114// UnmarshalBinary unmarshals data into e.
1115func (e *LogEntry) UnmarshalBinary(data []byte) error {
1116	var sz uint64
1117	var n int
1118	var seriesID uint64
1119	var err error
1120
1121	orig := data
1122	start := len(data)
1123
1124	// Parse flag data.
1125	if len(data) < 1 {
1126		return io.ErrShortBuffer
1127	}
1128	e.Flag, data = data[0], data[1:]
1129
1130	// Parse series id.
1131	if seriesID, n, err = uvarint(data); err != nil {
1132		return err
1133	}
1134	e.SeriesID, data = seriesID, data[n:]
1135
1136	// Parse name length.
1137	if sz, n, err = uvarint(data); err != nil {
1138		return err
1139	}
1140
1141	// Read name data.
1142	if len(data) < n+int(sz) {
1143		return io.ErrShortBuffer
1144	}
1145	e.Name, data = data[n:n+int(sz)], data[n+int(sz):]
1146
1147	// Parse key length.
1148	if sz, n, err = uvarint(data); err != nil {
1149		return err
1150	}
1151
1152	// Read key data.
1153	if len(data) < n+int(sz) {
1154		return io.ErrShortBuffer
1155	}
1156	e.Key, data = data[n:n+int(sz)], data[n+int(sz):]
1157
1158	// Parse value length.
1159	if sz, n, err = uvarint(data); err != nil {
1160		return err
1161	}
1162
1163	// Read value data.
1164	if len(data) < n+int(sz) {
1165		return io.ErrShortBuffer
1166	}
1167	e.Value, data = data[n:n+int(sz)], data[n+int(sz):]
1168
1169	// Compute checksum.
1170	chk := crc32.ChecksumIEEE(orig[:start-len(data)])
1171
1172	// Parse checksum.
1173	if len(data) < 4 {
1174		return io.ErrShortBuffer
1175	}
1176	e.Checksum, data = binary.BigEndian.Uint32(data[:4]), data[4:]
1177
1178	// Verify checksum.
1179	if chk != e.Checksum {
1180		return ErrLogEntryChecksumMismatch
1181	}
1182
1183	// Save length of elem.
1184	e.Size = start - len(data)
1185
1186	return nil
1187}
1188
1189// appendLogEntry appends to dst and returns the new buffer.
1190// This updates the checksum on the entry.
1191func appendLogEntry(dst []byte, e *LogEntry) []byte {
1192	var buf [binary.MaxVarintLen64]byte
1193	start := len(dst)
1194
1195	// Append flag.
1196	dst = append(dst, e.Flag)
1197
1198	// Append series id.
1199	n := binary.PutUvarint(buf[:], uint64(e.SeriesID))
1200	dst = append(dst, buf[:n]...)
1201
1202	// Append name.
1203	n = binary.PutUvarint(buf[:], uint64(len(e.Name)))
1204	dst = append(dst, buf[:n]...)
1205	dst = append(dst, e.Name...)
1206
1207	// Append key.
1208	n = binary.PutUvarint(buf[:], uint64(len(e.Key)))
1209	dst = append(dst, buf[:n]...)
1210	dst = append(dst, e.Key...)
1211
1212	// Append value.
1213	n = binary.PutUvarint(buf[:], uint64(len(e.Value)))
1214	dst = append(dst, buf[:n]...)
1215	dst = append(dst, e.Value...)
1216
1217	// Calculate checksum.
1218	e.Checksum = crc32.ChecksumIEEE(dst[start:])
1219
1220	// Append checksum.
1221	binary.BigEndian.PutUint32(buf[:4], e.Checksum)
1222	dst = append(dst, buf[:4]...)
1223
1224	return dst
1225}
1226
1227// logMeasurements represents a map of measurement names to measurements.
1228type logMeasurements map[string]*logMeasurement
1229
1230// bytes estimates the memory footprint of this logMeasurements, in bytes.
1231func (mms *logMeasurements) bytes() int {
1232	var b int
1233	for k, v := range *mms {
1234		b += len(k)
1235		b += v.bytes()
1236	}
1237	b += int(unsafe.Sizeof(*mms))
1238	return b
1239}
1240
1241type logMeasurement struct {
1242	name      []byte
1243	tagSet    map[string]logTagKey
1244	deleted   bool
1245	series    map[uint64]struct{}
1246	seriesSet *tsdb.SeriesIDSet
1247}
1248
1249// bytes estimates the memory footprint of this logMeasurement, in bytes.
1250func (m *logMeasurement) bytes() int {
1251	var b int
1252	b += len(m.name)
1253	for k, v := range m.tagSet {
1254		b += len(k)
1255		b += v.bytes()
1256	}
1257	b += (int(m.cardinality()) * 8)
1258	b += int(unsafe.Sizeof(*m))
1259	return b
1260}
1261
1262func (m *logMeasurement) addSeriesID(x uint64) {
1263	if m.seriesSet != nil {
1264		m.seriesSet.AddNoLock(x)
1265		return
1266	}
1267
1268	m.series[x] = struct{}{}
1269
1270	// If the map is getting too big it can be converted into a roaring seriesSet.
1271	if len(m.series) > 25 {
1272		m.seriesSet = tsdb.NewSeriesIDSet()
1273		for id := range m.series {
1274			m.seriesSet.AddNoLock(id)
1275		}
1276		m.series = nil
1277	}
1278}
1279
1280func (m *logMeasurement) removeSeriesID(x uint64) {
1281	if m.seriesSet != nil {
1282		m.seriesSet.RemoveNoLock(x)
1283		return
1284	}
1285	delete(m.series, x)
1286}
1287
1288func (m *logMeasurement) cardinality() int64 {
1289	if m.seriesSet != nil {
1290		return int64(m.seriesSet.Cardinality())
1291	}
1292	return int64(len(m.series))
1293}
1294
1295// forEach applies fn to every series ID in the logMeasurement.
1296func (m *logMeasurement) forEach(fn func(uint64)) {
1297	if m.seriesSet != nil {
1298		m.seriesSet.ForEachNoLock(fn)
1299		return
1300	}
1301
1302	for seriesID := range m.series {
1303		fn(seriesID)
1304	}
1305}
1306
1307// seriesIDs returns a sorted set of seriesIDs.
1308func (m *logMeasurement) seriesIDs() []uint64 {
1309	a := make([]uint64, 0, m.cardinality())
1310	if m.seriesSet != nil {
1311		m.seriesSet.ForEachNoLock(func(id uint64) { a = append(a, id) })
1312		return a // IDs are already sorted.
1313	}
1314
1315	for seriesID := range m.series {
1316		a = append(a, seriesID)
1317	}
1318	sort.Sort(uint64Slice(a))
1319	return a
1320}
1321
1322// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new
1323// one
1324func (m *logMeasurement) seriesIDSet() *tsdb.SeriesIDSet {
1325	if m.seriesSet != nil {
1326		return m.seriesSet.CloneNoLock()
1327	}
1328
1329	ss := tsdb.NewSeriesIDSet()
1330	for seriesID := range m.series {
1331		ss.AddNoLock(seriesID)
1332	}
1333	return ss
1334}
1335
1336func (m *logMeasurement) Name() []byte  { return m.name }
1337func (m *logMeasurement) Deleted() bool { return m.deleted }
1338
1339func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagKey {
1340	ts, ok := m.tagSet[string(key)]
1341	if !ok {
1342		ts = logTagKey{name: key, tagValues: make(map[string]logTagValue)}
1343	}
1344	return ts
1345}
1346
1347// keys returns a sorted list of tag keys.
1348func (m *logMeasurement) keys() []string {
1349	a := make([]string, 0, len(m.tagSet))
1350	for k := range m.tagSet {
1351		a = append(a, k)
1352	}
1353	sort.Strings(a)
1354	return a
1355}
1356
1357// logMeasurementSlice is a sortable list of log measurements.
1358type logMeasurementSlice []logMeasurement
1359
1360func (a logMeasurementSlice) Len() int           { return len(a) }
1361func (a logMeasurementSlice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
1362func (a logMeasurementSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
1363
1364// logMeasurementIterator represents an iterator over a slice of measurements.
1365type logMeasurementIterator struct {
1366	mms []logMeasurement
1367}
1368
1369// Next returns the next element in the iterator.
1370func (itr *logMeasurementIterator) Next() (e MeasurementElem) {
1371	if len(itr.mms) == 0 {
1372		return nil
1373	}
1374	e, itr.mms = &itr.mms[0], itr.mms[1:]
1375	return e
1376}
1377
1378type logTagKey struct {
1379	name      []byte
1380	deleted   bool
1381	tagValues map[string]logTagValue
1382}
1383
1384// bytes estimates the memory footprint of this logTagKey, in bytes.
1385func (tk *logTagKey) bytes() int {
1386	var b int
1387	b += len(tk.name)
1388	for k, v := range tk.tagValues {
1389		b += len(k)
1390		b += v.bytes()
1391	}
1392	b += int(unsafe.Sizeof(*tk))
1393	return b
1394}
1395
1396func (tk *logTagKey) Key() []byte   { return tk.name }
1397func (tk *logTagKey) Deleted() bool { return tk.deleted }
1398
1399func (tk *logTagKey) TagValueIterator() TagValueIterator {
1400	a := make([]logTagValue, 0, len(tk.tagValues))
1401	for _, v := range tk.tagValues {
1402		a = append(a, v)
1403	}
1404	return newLogTagValueIterator(a)
1405}
1406
1407func (tk *logTagKey) createTagValueIfNotExists(value []byte) logTagValue {
1408	tv, ok := tk.tagValues[string(value)]
1409	if !ok {
1410		tv = logTagValue{name: value, series: make(map[uint64]struct{})}
1411	}
1412	return tv
1413}
1414
1415// logTagKey is a sortable list of log tag keys.
1416type logTagKeySlice []logTagKey
1417
1418func (a logTagKeySlice) Len() int           { return len(a) }
1419func (a logTagKeySlice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
1420func (a logTagKeySlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
1421
1422type logTagValue struct {
1423	name      []byte
1424	deleted   bool
1425	series    map[uint64]struct{}
1426	seriesSet *tsdb.SeriesIDSet
1427}
1428
1429// bytes estimates the memory footprint of this logTagValue, in bytes.
1430func (tv *logTagValue) bytes() int {
1431	var b int
1432	b += len(tv.name)
1433	b += int(unsafe.Sizeof(*tv))
1434	b += (int(tv.cardinality()) * 8)
1435	return b
1436}
1437
1438func (tv *logTagValue) addSeriesID(x uint64) {
1439	if tv.seriesSet != nil {
1440		tv.seriesSet.AddNoLock(x)
1441		return
1442	}
1443
1444	tv.series[x] = struct{}{}
1445
1446	// If the map is getting too big it can be converted into a roaring seriesSet.
1447	if len(tv.series) > 25 {
1448		tv.seriesSet = tsdb.NewSeriesIDSet()
1449		for id := range tv.series {
1450			tv.seriesSet.AddNoLock(id)
1451		}
1452		tv.series = nil
1453	}
1454}
1455
1456func (tv *logTagValue) removeSeriesID(x uint64) {
1457	if tv.seriesSet != nil {
1458		tv.seriesSet.RemoveNoLock(x)
1459		return
1460	}
1461	delete(tv.series, x)
1462}
1463
1464func (tv *logTagValue) cardinality() int64 {
1465	if tv.seriesSet != nil {
1466		return int64(tv.seriesSet.Cardinality())
1467	}
1468	return int64(len(tv.series))
1469}
1470
1471// seriesIDSet returns a copy of the logMeasurement's seriesSet, or creates a new
1472// one
1473func (tv *logTagValue) seriesIDSet() *tsdb.SeriesIDSet {
1474	if tv.seriesSet != nil {
1475		return tv.seriesSet.CloneNoLock()
1476	}
1477
1478	ss := tsdb.NewSeriesIDSet()
1479	for seriesID := range tv.series {
1480		ss.AddNoLock(seriesID)
1481	}
1482	return ss
1483}
1484
1485func (tv *logTagValue) Value() []byte { return tv.name }
1486func (tv *logTagValue) Deleted() bool { return tv.deleted }
1487
1488// logTagValue is a sortable list of log tag values.
1489type logTagValueSlice []logTagValue
1490
1491func (a logTagValueSlice) Len() int           { return len(a) }
1492func (a logTagValueSlice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
1493func (a logTagValueSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[j].name) == -1 }
1494
1495// logTagKeyIterator represents an iterator over a slice of tag keys.
1496type logTagKeyIterator struct {
1497	a []logTagKey
1498}
1499
1500// newLogTagKeyIterator returns a new instance of logTagKeyIterator.
1501func newLogTagKeyIterator(a []logTagKey) *logTagKeyIterator {
1502	sort.Sort(logTagKeySlice(a))
1503	return &logTagKeyIterator{a: a}
1504}
1505
1506// Next returns the next element in the iterator.
1507func (itr *logTagKeyIterator) Next() (e TagKeyElem) {
1508	if len(itr.a) == 0 {
1509		return nil
1510	}
1511	e, itr.a = &itr.a[0], itr.a[1:]
1512	return e
1513}
1514
1515// logTagValueIterator represents an iterator over a slice of tag values.
1516type logTagValueIterator struct {
1517	a []logTagValue
1518}
1519
1520// newLogTagValueIterator returns a new instance of logTagValueIterator.
1521func newLogTagValueIterator(a []logTagValue) *logTagValueIterator {
1522	sort.Sort(logTagValueSlice(a))
1523	return &logTagValueIterator{a: a}
1524}
1525
1526// Next returns the next element in the iterator.
1527func (itr *logTagValueIterator) Next() (e TagValueElem) {
1528	if len(itr.a) == 0 {
1529		return nil
1530	}
1531	e, itr.a = &itr.a[0], itr.a[1:]
1532	return e
1533}
1534
1535// FormatLogFileName generates a log filename for the given index.
1536func FormatLogFileName(id int) string {
1537	return fmt.Sprintf("L0-%08d%s", id, LogFileExt)
1538}
1539