1// Copyright 2017 The Prometheus Authors
2
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package tsdb
16
17import (
18	"encoding/json"
19	"io/ioutil"
20	"os"
21	"path/filepath"
22	"sync"
23
24	"github.com/oklog/ulid"
25	"github.com/pkg/errors"
26	"github.com/prometheus/tsdb/chunkenc"
27	"github.com/prometheus/tsdb/chunks"
28	"github.com/prometheus/tsdb/index"
29	"github.com/prometheus/tsdb/labels"
30)
31
32// IndexWriter serializes the index for a block of series data.
33// The methods must be called in the order they are specified in.
34type IndexWriter interface {
35	// AddSymbols registers all string symbols that are encountered in series
36	// and other indices.
37	AddSymbols(sym map[string]struct{}) error
38
39	// AddSeries populates the index writer with a series and its offsets
40	// of chunks that the index can reference.
41	// Implementations may require series to be insert in increasing order by
42	// their labels.
43	// The reference numbers are used to resolve entries in postings lists that
44	// are added later.
45	AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error
46
47	// WriteLabelIndex serializes an index from label names to values.
48	// The passed in values chained tuples of strings of the length of names.
49	WriteLabelIndex(names []string, values []string) error
50
51	// WritePostings writes a postings list for a single label pair.
52	// The Postings here contain refs to the series that were added.
53	WritePostings(name, value string, it index.Postings) error
54
55	// Close writes any finalization and closes the resources associated with
56	// the underlying writer.
57	Close() error
58}
59
60// IndexReader provides reading access of serialized index data.
61type IndexReader interface {
62	// Symbols returns a set of string symbols that may occur in series' labels
63	// and indices.
64	Symbols() (map[string]struct{}, error)
65
66	// LabelValues returns the possible label values.
67	LabelValues(names ...string) (index.StringTuples, error)
68
69	// Postings returns the postings list iterator for the label pair.
70	// The Postings here contain the offsets to the series inside the index.
71	// Found IDs are not strictly required to point to a valid Series, e.g. during
72	// background garbage collections.
73	Postings(name, value string) (index.Postings, error)
74
75	// SortedPostings returns a postings list that is reordered to be sorted
76	// by the label set of the underlying series.
77	SortedPostings(index.Postings) index.Postings
78
79	// Series populates the given labels and chunk metas for the series identified
80	// by the reference.
81	// Returns ErrNotFound if the ref does not resolve to a known series.
82	Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error
83
84	// LabelIndices returns a list of string tuples for which a label value index exists.
85	// NOTE: This is deprecated. Use `LabelNames()` instead.
86	LabelIndices() ([][]string, error)
87
88	// LabelNames returns all the unique label names present in the index in sorted order.
89	LabelNames() ([]string, error)
90
91	// Close releases the underlying resources of the reader.
92	Close() error
93}
94
95// StringTuples provides access to a sorted list of string tuples.
96type StringTuples interface {
97	// Total number of tuples in the list.
98	Len() int
99	// At returns the tuple at position i.
100	At(i int) ([]string, error)
101}
102
103// ChunkWriter serializes a time block of chunked series data.
104type ChunkWriter interface {
105	// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
106	// must be populated.
107	// After returning successfully, the Ref fields in the ChunkMetas
108	// are set and can be used to retrieve the chunks from the written data.
109	WriteChunks(chunks ...chunks.Meta) error
110
111	// Close writes any required finalization and closes the resources
112	// associated with the underlying writer.
113	Close() error
114}
115
116// ChunkReader provides reading access of serialized time series data.
117type ChunkReader interface {
118	// Chunk returns the series data chunk with the given reference.
119	Chunk(ref uint64) (chunkenc.Chunk, error)
120
121	// Close releases all underlying resources of the reader.
122	Close() error
123}
124
125// BlockReader provides reading access to a data block.
126type BlockReader interface {
127	// Index returns an IndexReader over the block's data.
128	Index() (IndexReader, error)
129
130	// Chunks returns a ChunkReader over the block's data.
131	Chunks() (ChunkReader, error)
132
133	// Tombstones returns a TombstoneReader over the block's deleted data.
134	Tombstones() (TombstoneReader, error)
135}
136
137// Appendable defines an entity to which data can be appended.
138type Appendable interface {
139	// Appender returns a new Appender against an underlying store.
140	Appender() Appender
141}
142
143// BlockMeta provides meta information about a block.
144type BlockMeta struct {
145	// Unique identifier for the block and its contents. Changes on compaction.
146	ULID ulid.ULID `json:"ulid"`
147
148	// MinTime and MaxTime specify the time range all samples
149	// in the block are in.
150	MinTime int64 `json:"minTime"`
151	MaxTime int64 `json:"maxTime"`
152
153	// Stats about the contents of the block.
154	Stats BlockStats `json:"stats,omitempty"`
155
156	// Information on compactions the block was created from.
157	Compaction BlockMetaCompaction `json:"compaction"`
158
159	// Version of the index format.
160	Version int `json:"version"`
161}
162
163// BlockStats contains stats about contents of a block.
164type BlockStats struct {
165	NumSamples    uint64 `json:"numSamples,omitempty"`
166	NumSeries     uint64 `json:"numSeries,omitempty"`
167	NumChunks     uint64 `json:"numChunks,omitempty"`
168	NumTombstones uint64 `json:"numTombstones,omitempty"`
169}
170
171// BlockDesc describes a block by ULID and time range.
172type BlockDesc struct {
173	ULID    ulid.ULID `json:"ulid"`
174	MinTime int64     `json:"minTime"`
175	MaxTime int64     `json:"maxTime"`
176}
177
178// BlockMetaCompaction holds information about compactions a block went through.
179type BlockMetaCompaction struct {
180	// Maximum number of compaction cycles any source block has
181	// gone through.
182	Level int `json:"level"`
183	// ULIDs of all source head blocks that went into the block.
184	Sources []ulid.ULID `json:"sources,omitempty"`
185	// Short descriptions of the direct blocks that were used to create
186	// this block.
187	Parents []BlockDesc `json:"parents,omitempty"`
188	Failed  bool        `json:"failed,omitempty"`
189}
190
191const indexFilename = "index"
192const metaFilename = "meta.json"
193
194func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
195
196func readMetaFile(dir string) (*BlockMeta, error) {
197	b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
198	if err != nil {
199		return nil, err
200	}
201	var m BlockMeta
202
203	if err := json.Unmarshal(b, &m); err != nil {
204		return nil, err
205	}
206	if m.Version != 1 {
207		return nil, errors.Errorf("unexpected meta file version %d", m.Version)
208	}
209
210	return &m, nil
211}
212
213func writeMetaFile(dir string, meta *BlockMeta) error {
214	meta.Version = 1
215
216	// Make any changes to the file appear atomic.
217	path := filepath.Join(dir, metaFilename)
218	tmp := path + ".tmp"
219
220	f, err := os.Create(tmp)
221	if err != nil {
222		return err
223	}
224
225	enc := json.NewEncoder(f)
226	enc.SetIndent("", "\t")
227
228	var merr MultiError
229
230	if merr.Add(enc.Encode(meta)); merr.Err() != nil {
231		merr.Add(f.Close())
232		return merr.Err()
233	}
234	if err := f.Close(); err != nil {
235		return err
236	}
237	return renameFile(tmp, path)
238}
239
240// Block represents a directory of time series data covering a continuous time range.
241type Block struct {
242	mtx            sync.RWMutex
243	closing        bool
244	pendingReaders sync.WaitGroup
245
246	dir  string
247	meta BlockMeta
248
249	// Symbol Table Size in bytes.
250	// We maintain this variable to avoid recalculation everytime.
251	symbolTableSize uint64
252
253	chunkr     ChunkReader
254	indexr     IndexReader
255	tombstones TombstoneReader
256}
257
258// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
259// to instantiate chunk structs.
260func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
261	meta, err := readMetaFile(dir)
262	if err != nil {
263		return nil, err
264	}
265
266	cr, err := chunks.NewDirReader(chunkDir(dir), pool)
267	if err != nil {
268		return nil, err
269	}
270	ir, err := index.NewFileReader(filepath.Join(dir, "index"))
271	if err != nil {
272		return nil, err
273	}
274
275	tr, err := readTombstones(dir)
276	if err != nil {
277		return nil, err
278	}
279
280	pb := &Block{
281		dir:             dir,
282		meta:            *meta,
283		chunkr:          cr,
284		indexr:          ir,
285		tombstones:      tr,
286		symbolTableSize: ir.SymbolTableSize(),
287	}
288	return pb, nil
289}
290
291// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
292func (pb *Block) Close() error {
293	pb.mtx.Lock()
294	pb.closing = true
295	pb.mtx.Unlock()
296
297	pb.pendingReaders.Wait()
298
299	var merr MultiError
300
301	merr.Add(pb.chunkr.Close())
302	merr.Add(pb.indexr.Close())
303	merr.Add(pb.tombstones.Close())
304
305	return merr.Err()
306}
307
308func (pb *Block) String() string {
309	return pb.meta.ULID.String()
310}
311
312// Dir returns the directory of the block.
313func (pb *Block) Dir() string { return pb.dir }
314
315// Meta returns meta information about the block.
316func (pb *Block) Meta() BlockMeta { return pb.meta }
317
318// ErrClosing is returned when a block is in the process of being closed.
319var ErrClosing = errors.New("block is closing")
320
321func (pb *Block) startRead() error {
322	pb.mtx.RLock()
323	defer pb.mtx.RUnlock()
324
325	if pb.closing {
326		return ErrClosing
327	}
328	pb.pendingReaders.Add(1)
329	return nil
330}
331
332// Index returns a new IndexReader against the block data.
333func (pb *Block) Index() (IndexReader, error) {
334	if err := pb.startRead(); err != nil {
335		return nil, err
336	}
337	return blockIndexReader{ir: pb.indexr, b: pb}, nil
338}
339
340// Chunks returns a new ChunkReader against the block data.
341func (pb *Block) Chunks() (ChunkReader, error) {
342	if err := pb.startRead(); err != nil {
343		return nil, err
344	}
345	return blockChunkReader{ChunkReader: pb.chunkr, b: pb}, nil
346}
347
348// Tombstones returns a new TombstoneReader against the block data.
349func (pb *Block) Tombstones() (TombstoneReader, error) {
350	if err := pb.startRead(); err != nil {
351		return nil, err
352	}
353	return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil
354}
355
356// GetSymbolTableSize returns the Symbol Table Size in the index of this block.
357func (pb *Block) GetSymbolTableSize() uint64 {
358	return pb.symbolTableSize
359}
360
361func (pb *Block) setCompactionFailed() error {
362	pb.meta.Compaction.Failed = true
363	return writeMetaFile(pb.dir, &pb.meta)
364}
365
366type blockIndexReader struct {
367	ir IndexReader
368	b  *Block
369}
370
371func (r blockIndexReader) Symbols() (map[string]struct{}, error) {
372	s, err := r.ir.Symbols()
373	return s, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
374}
375
376func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, error) {
377	st, err := r.ir.LabelValues(names...)
378	return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
379}
380
381func (r blockIndexReader) Postings(name, value string) (index.Postings, error) {
382	p, err := r.ir.Postings(name, value)
383	return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
384}
385
386func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
387	return r.ir.SortedPostings(p)
388}
389
390func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
391	return errors.Wrapf(
392		r.ir.Series(ref, lset, chks),
393		"block: %s",
394		r.b.Meta().ULID,
395	)
396}
397
398func (r blockIndexReader) LabelIndices() ([][]string, error) {
399	ss, err := r.ir.LabelIndices()
400	return ss, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
401}
402
403func (r blockIndexReader) LabelNames() ([]string, error) {
404	return r.b.LabelNames()
405}
406
407func (r blockIndexReader) Close() error {
408	r.b.pendingReaders.Done()
409	return nil
410}
411
412type blockTombstoneReader struct {
413	TombstoneReader
414	b *Block
415}
416
417func (r blockTombstoneReader) Close() error {
418	r.b.pendingReaders.Done()
419	return nil
420}
421
422type blockChunkReader struct {
423	ChunkReader
424	b *Block
425}
426
427func (r blockChunkReader) Close() error {
428	r.b.pendingReaders.Done()
429	return nil
430}
431
432// Delete matching series between mint and maxt in the block.
433func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
434	pb.mtx.Lock()
435	defer pb.mtx.Unlock()
436
437	if pb.closing {
438		return ErrClosing
439	}
440
441	p, err := PostingsForMatchers(pb.indexr, ms...)
442	if err != nil {
443		return errors.Wrap(err, "select series")
444	}
445
446	ir := pb.indexr
447
448	// Choose only valid postings which have chunks in the time-range.
449	stones := newMemTombstones()
450
451	var lset labels.Labels
452	var chks []chunks.Meta
453
454Outer:
455	for p.Next() {
456		err := ir.Series(p.At(), &lset, &chks)
457		if err != nil {
458			return err
459		}
460
461		for _, chk := range chks {
462			if chk.OverlapsClosedInterval(mint, maxt) {
463				// Delete only until the current values and not beyond.
464				tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
465				stones.addInterval(p.At(), Interval{tmin, tmax})
466				continue Outer
467			}
468		}
469	}
470
471	if p.Err() != nil {
472		return p.Err()
473	}
474
475	err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
476		for _, iv := range ivs {
477			stones.addInterval(id, iv)
478		}
479		return nil
480	})
481	if err != nil {
482		return err
483	}
484	pb.tombstones = stones
485	pb.meta.Stats.NumTombstones = pb.tombstones.Total()
486
487	if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
488		return err
489	}
490	return writeMetaFile(pb.dir, &pb.meta)
491}
492
493// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).
494// If there was a rewrite, then it returns the ULID of the new block written, else nil.
495func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
496	numStones := 0
497
498	if err := pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
499		numStones += len(ivs)
500		return nil
501	}); err != nil {
502		// This should never happen, as the iteration function only returns nil.
503		panic(err)
504	}
505	if numStones == 0 {
506		return nil, nil
507	}
508
509	meta := pb.Meta()
510	uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta)
511	if err != nil {
512		return nil, err
513	}
514	return &uid, nil
515}
516
517// Snapshot creates snapshot of the block into dir.
518func (pb *Block) Snapshot(dir string) error {
519	blockDir := filepath.Join(dir, pb.meta.ULID.String())
520	if err := os.MkdirAll(blockDir, 0777); err != nil {
521		return errors.Wrap(err, "create snapshot block dir")
522	}
523
524	chunksDir := chunkDir(blockDir)
525	if err := os.MkdirAll(chunksDir, 0777); err != nil {
526		return errors.Wrap(err, "create snapshot chunk dir")
527	}
528
529	// Hardlink meta, index and tombstones
530	for _, fname := range []string{
531		metaFilename,
532		indexFilename,
533		tombstoneFilename,
534	} {
535		if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil {
536			return errors.Wrapf(err, "create snapshot %s", fname)
537		}
538	}
539
540	// Hardlink the chunks
541	curChunkDir := chunkDir(pb.dir)
542	files, err := ioutil.ReadDir(curChunkDir)
543	if err != nil {
544		return errors.Wrap(err, "ReadDir the current chunk dir")
545	}
546
547	for _, f := range files {
548		err := os.Link(filepath.Join(curChunkDir, f.Name()), filepath.Join(chunksDir, f.Name()))
549		if err != nil {
550			return errors.Wrap(err, "hardlink a chunk")
551		}
552	}
553
554	return nil
555}
556
557// OverlapsClosedInterval returns true if the block overlaps [mint, maxt].
558func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool {
559	// The block itself is a half-open interval
560	// [pb.meta.MinTime, pb.meta.MaxTime).
561	return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime
562}
563
564// LabelNames returns all the unique label names present in the Block in sorted order.
565func (pb *Block) LabelNames() ([]string, error) {
566	return pb.indexr.LabelNames()
567}
568
569func clampInterval(a, b, mint, maxt int64) (int64, int64) {
570	if a < mint {
571		a = mint
572	}
573	if b > maxt {
574		b = maxt
575	}
576	return a, b
577}
578