1// Copyright 2017 The Prometheus Authors
2
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package tsdb
16
17import (
18	"encoding/json"
19	"io"
20	"io/ioutil"
21	"os"
22	"path/filepath"
23	"sync"
24
25	"github.com/go-kit/kit/log"
26	"github.com/go-kit/kit/log/level"
27	"github.com/oklog/ulid"
28	"github.com/pkg/errors"
29	"github.com/prometheus/tsdb/chunkenc"
30	"github.com/prometheus/tsdb/chunks"
31	tsdb_errors "github.com/prometheus/tsdb/errors"
32	"github.com/prometheus/tsdb/index"
33	"github.com/prometheus/tsdb/labels"
34)
35
36// IndexWriter serializes the index for a block of series data.
37// The methods must be called in the order they are specified in.
38type IndexWriter interface {
39	// AddSymbols registers all string symbols that are encountered in series
40	// and other indices.
41	AddSymbols(sym map[string]struct{}) error
42
43	// AddSeries populates the index writer with a series and its offsets
44	// of chunks that the index can reference.
45	// Implementations may require series to be insert in increasing order by
46	// their labels.
47	// The reference numbers are used to resolve entries in postings lists that
48	// are added later.
49	AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error
50
51	// WriteLabelIndex serializes an index from label names to values.
52	// The passed in values chained tuples of strings of the length of names.
53	WriteLabelIndex(names []string, values []string) error
54
55	// WritePostings writes a postings list for a single label pair.
56	// The Postings here contain refs to the series that were added.
57	WritePostings(name, value string, it index.Postings) error
58
59	// Close writes any finalization and closes the resources associated with
60	// the underlying writer.
61	Close() error
62}
63
64// IndexReader provides reading access of serialized index data.
65type IndexReader interface {
66	// Symbols returns a set of string symbols that may occur in series' labels
67	// and indices.
68	Symbols() (map[string]struct{}, error)
69
70	// LabelValues returns the possible label values.
71	LabelValues(names ...string) (index.StringTuples, error)
72
73	// Postings returns the postings list iterator for the label pair.
74	// The Postings here contain the offsets to the series inside the index.
75	// Found IDs are not strictly required to point to a valid Series, e.g. during
76	// background garbage collections.
77	Postings(name, value string) (index.Postings, error)
78
79	// SortedPostings returns a postings list that is reordered to be sorted
80	// by the label set of the underlying series.
81	SortedPostings(index.Postings) index.Postings
82
83	// Series populates the given labels and chunk metas for the series identified
84	// by the reference.
85	// Returns ErrNotFound if the ref does not resolve to a known series.
86	Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error
87
88	// LabelIndices returns a list of string tuples for which a label value index exists.
89	// NOTE: This is deprecated. Use `LabelNames()` instead.
90	LabelIndices() ([][]string, error)
91
92	// LabelNames returns all the unique label names present in the index in sorted order.
93	LabelNames() ([]string, error)
94
95	// Close releases the underlying resources of the reader.
96	Close() error
97}
98
99// StringTuples provides access to a sorted list of string tuples.
100type StringTuples interface {
101	// Total number of tuples in the list.
102	Len() int
103	// At returns the tuple at position i.
104	At(i int) ([]string, error)
105}
106
107// ChunkWriter serializes a time block of chunked series data.
108type ChunkWriter interface {
109	// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
110	// must be populated.
111	// After returning successfully, the Ref fields in the ChunkMetas
112	// are set and can be used to retrieve the chunks from the written data.
113	WriteChunks(chunks ...chunks.Meta) error
114
115	// Close writes any required finalization and closes the resources
116	// associated with the underlying writer.
117	Close() error
118}
119
120// ChunkReader provides reading access of serialized time series data.
121type ChunkReader interface {
122	// Chunk returns the series data chunk with the given reference.
123	Chunk(ref uint64) (chunkenc.Chunk, error)
124
125	// Close releases all underlying resources of the reader.
126	Close() error
127}
128
129// BlockReader provides reading access to a data block.
130type BlockReader interface {
131	// Index returns an IndexReader over the block's data.
132	Index() (IndexReader, error)
133
134	// Chunks returns a ChunkReader over the block's data.
135	Chunks() (ChunkReader, error)
136
137	// Tombstones returns a TombstoneReader over the block's deleted data.
138	Tombstones() (TombstoneReader, error)
139
140	// MinTime returns the min time of the block.
141	MinTime() int64
142
143	// MaxTime returns the max time of the block.
144	MaxTime() int64
145}
146
147// Appendable defines an entity to which data can be appended.
148type Appendable interface {
149	// Appender returns a new Appender against an underlying store.
150	Appender() Appender
151}
152
153// SizeReader returns the size of the object in bytes.
154type SizeReader interface {
155	// Size returns the size in bytes.
156	Size() int64
157}
158
159// BlockMeta provides meta information about a block.
160type BlockMeta struct {
161	// Unique identifier for the block and its contents. Changes on compaction.
162	ULID ulid.ULID `json:"ulid"`
163
164	// MinTime and MaxTime specify the time range all samples
165	// in the block are in.
166	MinTime int64 `json:"minTime"`
167	MaxTime int64 `json:"maxTime"`
168
169	// Stats about the contents of the block.
170	Stats BlockStats `json:"stats,omitempty"`
171
172	// Information on compactions the block was created from.
173	Compaction BlockMetaCompaction `json:"compaction"`
174
175	// Version of the index format.
176	Version int `json:"version"`
177}
178
179// BlockStats contains stats about contents of a block.
180type BlockStats struct {
181	NumSamples    uint64 `json:"numSamples,omitempty"`
182	NumSeries     uint64 `json:"numSeries,omitempty"`
183	NumChunks     uint64 `json:"numChunks,omitempty"`
184	NumTombstones uint64 `json:"numTombstones,omitempty"`
185	NumBytes      int64  `json:"numBytes,omitempty"`
186}
187
188// BlockDesc describes a block by ULID and time range.
189type BlockDesc struct {
190	ULID    ulid.ULID `json:"ulid"`
191	MinTime int64     `json:"minTime"`
192	MaxTime int64     `json:"maxTime"`
193}
194
195// BlockMetaCompaction holds information about compactions a block went through.
196type BlockMetaCompaction struct {
197	// Maximum number of compaction cycles any source block has
198	// gone through.
199	Level int `json:"level"`
200	// ULIDs of all source head blocks that went into the block.
201	Sources []ulid.ULID `json:"sources,omitempty"`
202	// Indicates that during compaction it resulted in a block without any samples
203	// so it should be deleted on the next reload.
204	Deletable bool `json:"deletable,omitempty"`
205	// Short descriptions of the direct blocks that were used to create
206	// this block.
207	Parents []BlockDesc `json:"parents,omitempty"`
208	Failed  bool        `json:"failed,omitempty"`
209}
210
211const indexFilename = "index"
212const metaFilename = "meta.json"
213
214func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
215
216func readMetaFile(dir string) (*BlockMeta, error) {
217	b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
218	if err != nil {
219		return nil, err
220	}
221	var m BlockMeta
222
223	if err := json.Unmarshal(b, &m); err != nil {
224		return nil, err
225	}
226	if m.Version != 1 {
227		return nil, errors.Errorf("unexpected meta file version %d", m.Version)
228	}
229
230	return &m, nil
231}
232
233func writeMetaFile(dir string, meta *BlockMeta) error {
234	meta.Version = 1
235
236	// Make any changes to the file appear atomic.
237	path := filepath.Join(dir, metaFilename)
238	tmp := path + ".tmp"
239
240	f, err := os.Create(tmp)
241	if err != nil {
242		return err
243	}
244
245	enc := json.NewEncoder(f)
246	enc.SetIndent("", "\t")
247
248	var merr tsdb_errors.MultiError
249
250	if merr.Add(enc.Encode(meta)); merr.Err() != nil {
251		merr.Add(f.Close())
252		return merr.Err()
253	}
254	// Force the kernel to persist the file on disk to avoid data loss if the host crashes.
255	if merr.Add(f.Sync()); merr.Err() != nil {
256		merr.Add(f.Close())
257		return merr.Err()
258	}
259	if err := f.Close(); err != nil {
260		return err
261	}
262	return renameFile(tmp, path)
263}
264
265// Block represents a directory of time series data covering a continuous time range.
266type Block struct {
267	mtx            sync.RWMutex
268	closing        bool
269	pendingReaders sync.WaitGroup
270
271	dir  string
272	meta BlockMeta
273
274	// Symbol Table Size in bytes.
275	// We maintain this variable to avoid recalculation everytime.
276	symbolTableSize uint64
277
278	chunkr     ChunkReader
279	indexr     IndexReader
280	tombstones TombstoneReader
281}
282
283// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
284// to instantiate chunk structs.
285func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
286	if logger == nil {
287		logger = log.NewNopLogger()
288	}
289	var closers []io.Closer
290	defer func() {
291		if err != nil {
292			var merr tsdb_errors.MultiError
293			merr.Add(err)
294			merr.Add(closeAll(closers))
295			err = merr.Err()
296		}
297	}()
298	meta, err := readMetaFile(dir)
299	if err != nil {
300		return nil, err
301	}
302
303	cr, err := chunks.NewDirReader(chunkDir(dir), pool)
304	if err != nil {
305		return nil, err
306	}
307	closers = append(closers, cr)
308
309	ir, err := index.NewFileReader(filepath.Join(dir, indexFilename))
310	if err != nil {
311		return nil, err
312	}
313	closers = append(closers, ir)
314
315	tr, tsr, err := readTombstones(dir)
316	if err != nil {
317		return nil, err
318	}
319	closers = append(closers, tr)
320
321	// TODO refactor to set this at block creation time as
322	// that would be the logical place for a block size to be calculated.
323	bs := blockSize(cr, ir, tsr)
324	meta.Stats.NumBytes = bs
325	err = writeMetaFile(dir, meta)
326	if err != nil {
327		level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
328	}
329
330	pb = &Block{
331		dir:             dir,
332		meta:            *meta,
333		chunkr:          cr,
334		indexr:          ir,
335		tombstones:      tr,
336		symbolTableSize: ir.SymbolTableSize(),
337	}
338	return pb, nil
339}
340
341func blockSize(rr ...SizeReader) int64 {
342	var total int64
343	for _, r := range rr {
344		if r != nil {
345			total += r.Size()
346		}
347	}
348	return total
349}
350
351// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
352func (pb *Block) Close() error {
353	pb.mtx.Lock()
354	pb.closing = true
355	pb.mtx.Unlock()
356
357	pb.pendingReaders.Wait()
358
359	var merr tsdb_errors.MultiError
360
361	merr.Add(pb.chunkr.Close())
362	merr.Add(pb.indexr.Close())
363	merr.Add(pb.tombstones.Close())
364
365	return merr.Err()
366}
367
368func (pb *Block) String() string {
369	return pb.meta.ULID.String()
370}
371
372// Dir returns the directory of the block.
373func (pb *Block) Dir() string { return pb.dir }
374
375// Meta returns meta information about the block.
376func (pb *Block) Meta() BlockMeta { return pb.meta }
377
378// MinTime returns the min time of the meta.
379func (pb *Block) MinTime() int64 { return pb.meta.MinTime }
380
381// MaxTime returns the max time of the meta.
382func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime }
383
384// Size returns the number of bytes that the block takes up.
385func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }
386
387// ErrClosing is returned when a block is in the process of being closed.
388var ErrClosing = errors.New("block is closing")
389
390func (pb *Block) startRead() error {
391	pb.mtx.RLock()
392	defer pb.mtx.RUnlock()
393
394	if pb.closing {
395		return ErrClosing
396	}
397	pb.pendingReaders.Add(1)
398	return nil
399}
400
401// Index returns a new IndexReader against the block data.
402func (pb *Block) Index() (IndexReader, error) {
403	if err := pb.startRead(); err != nil {
404		return nil, err
405	}
406	return blockIndexReader{ir: pb.indexr, b: pb}, nil
407}
408
409// Chunks returns a new ChunkReader against the block data.
410func (pb *Block) Chunks() (ChunkReader, error) {
411	if err := pb.startRead(); err != nil {
412		return nil, err
413	}
414	return blockChunkReader{ChunkReader: pb.chunkr, b: pb}, nil
415}
416
417// Tombstones returns a new TombstoneReader against the block data.
418func (pb *Block) Tombstones() (TombstoneReader, error) {
419	if err := pb.startRead(); err != nil {
420		return nil, err
421	}
422	return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil
423}
424
425// GetSymbolTableSize returns the Symbol Table Size in the index of this block.
426func (pb *Block) GetSymbolTableSize() uint64 {
427	return pb.symbolTableSize
428}
429
430func (pb *Block) setCompactionFailed() error {
431	pb.meta.Compaction.Failed = true
432	return writeMetaFile(pb.dir, &pb.meta)
433}
434
435type blockIndexReader struct {
436	ir IndexReader
437	b  *Block
438}
439
440func (r blockIndexReader) Symbols() (map[string]struct{}, error) {
441	s, err := r.ir.Symbols()
442	return s, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
443}
444
445func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, error) {
446	st, err := r.ir.LabelValues(names...)
447	return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
448}
449
450func (r blockIndexReader) Postings(name, value string) (index.Postings, error) {
451	p, err := r.ir.Postings(name, value)
452	return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
453}
454
455func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
456	return r.ir.SortedPostings(p)
457}
458
459func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
460	return errors.Wrapf(
461		r.ir.Series(ref, lset, chks),
462		"block: %s",
463		r.b.Meta().ULID,
464	)
465}
466
467func (r blockIndexReader) LabelIndices() ([][]string, error) {
468	ss, err := r.ir.LabelIndices()
469	return ss, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
470}
471
472func (r blockIndexReader) LabelNames() ([]string, error) {
473	return r.b.LabelNames()
474}
475
476func (r blockIndexReader) Close() error {
477	r.b.pendingReaders.Done()
478	return nil
479}
480
481type blockTombstoneReader struct {
482	TombstoneReader
483	b *Block
484}
485
486func (r blockTombstoneReader) Close() error {
487	r.b.pendingReaders.Done()
488	return nil
489}
490
491type blockChunkReader struct {
492	ChunkReader
493	b *Block
494}
495
496func (r blockChunkReader) Close() error {
497	r.b.pendingReaders.Done()
498	return nil
499}
500
501// Delete matching series between mint and maxt in the block.
502func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
503	pb.mtx.Lock()
504	defer pb.mtx.Unlock()
505
506	if pb.closing {
507		return ErrClosing
508	}
509
510	p, err := PostingsForMatchers(pb.indexr, ms...)
511	if err != nil {
512		return errors.Wrap(err, "select series")
513	}
514
515	ir := pb.indexr
516
517	// Choose only valid postings which have chunks in the time-range.
518	stones := newMemTombstones()
519
520	var lset labels.Labels
521	var chks []chunks.Meta
522
523Outer:
524	for p.Next() {
525		err := ir.Series(p.At(), &lset, &chks)
526		if err != nil {
527			return err
528		}
529
530		for _, chk := range chks {
531			if chk.OverlapsClosedInterval(mint, maxt) {
532				// Delete only until the current values and not beyond.
533				tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
534				stones.addInterval(p.At(), Interval{tmin, tmax})
535				continue Outer
536			}
537		}
538	}
539
540	if p.Err() != nil {
541		return p.Err()
542	}
543
544	err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
545		for _, iv := range ivs {
546			stones.addInterval(id, iv)
547		}
548		return nil
549	})
550	if err != nil {
551		return err
552	}
553	pb.tombstones = stones
554	pb.meta.Stats.NumTombstones = pb.tombstones.Total()
555
556	if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
557		return err
558	}
559	return writeMetaFile(pb.dir, &pb.meta)
560}
561
562// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).
563// If there was a rewrite, then it returns the ULID of the new block written, else nil.
564func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
565	numStones := 0
566
567	if err := pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
568		numStones += len(ivs)
569		return nil
570	}); err != nil {
571		// This should never happen, as the iteration function only returns nil.
572		panic(err)
573	}
574	if numStones == 0 {
575		return nil, nil
576	}
577
578	meta := pb.Meta()
579	uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta)
580	if err != nil {
581		return nil, err
582	}
583	return &uid, nil
584}
585
586// Snapshot creates snapshot of the block into dir.
587func (pb *Block) Snapshot(dir string) error {
588	blockDir := filepath.Join(dir, pb.meta.ULID.String())
589	if err := os.MkdirAll(blockDir, 0777); err != nil {
590		return errors.Wrap(err, "create snapshot block dir")
591	}
592
593	chunksDir := chunkDir(blockDir)
594	if err := os.MkdirAll(chunksDir, 0777); err != nil {
595		return errors.Wrap(err, "create snapshot chunk dir")
596	}
597
598	// Hardlink meta, index and tombstones
599	for _, fname := range []string{
600		metaFilename,
601		indexFilename,
602		tombstoneFilename,
603	} {
604		if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil {
605			return errors.Wrapf(err, "create snapshot %s", fname)
606		}
607	}
608
609	// Hardlink the chunks
610	curChunkDir := chunkDir(pb.dir)
611	files, err := ioutil.ReadDir(curChunkDir)
612	if err != nil {
613		return errors.Wrap(err, "ReadDir the current chunk dir")
614	}
615
616	for _, f := range files {
617		err := os.Link(filepath.Join(curChunkDir, f.Name()), filepath.Join(chunksDir, f.Name()))
618		if err != nil {
619			return errors.Wrap(err, "hardlink a chunk")
620		}
621	}
622
623	return nil
624}
625
626// OverlapsClosedInterval returns true if the block overlaps [mint, maxt].
627func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool {
628	// The block itself is a half-open interval
629	// [pb.meta.MinTime, pb.meta.MaxTime).
630	return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime
631}
632
633// LabelNames returns all the unique label names present in the Block in sorted order.
634func (pb *Block) LabelNames() ([]string, error) {
635	return pb.indexr.LabelNames()
636}
637
638func clampInterval(a, b, mint, maxt int64) (int64, int64) {
639	if a < mint {
640		a = mint
641	}
642	if b > maxt {
643		b = maxt
644	}
645	return a, b
646}
647