1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package chunks
15
16import (
17	"bufio"
18	"encoding/binary"
19	"fmt"
20	"hash"
21	"hash/crc32"
22	"io"
23	"io/ioutil"
24	"os"
25	"path/filepath"
26	"strconv"
27
28	"github.com/pkg/errors"
29	"github.com/prometheus/tsdb/chunkenc"
30	tsdb_errors "github.com/prometheus/tsdb/errors"
31	"github.com/prometheus/tsdb/fileutil"
32)
33
34const (
35	// MagicChunks is 4 bytes at the head of a series file.
36	MagicChunks = 0x85BD40DD
37	// MagicChunksSize is the size in bytes of MagicChunks.
38	MagicChunksSize = 4
39
40	chunksFormatV1          = 1
41	ChunksFormatVersionSize = 1
42
43	chunkHeaderSize = MagicChunksSize + ChunksFormatVersionSize
44)
45
46// Meta holds information about a chunk of data.
47type Meta struct {
48	// Ref and Chunk hold either a reference that can be used to retrieve
49	// chunk data or the data itself.
50	// Generally, only one of them is set.
51	Ref   uint64
52	Chunk chunkenc.Chunk
53
54	MinTime, MaxTime int64 // time range the data covers
55}
56
57// writeHash writes the chunk encoding and raw data into the provided hash.
58func (cm *Meta) writeHash(h hash.Hash) error {
59	if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil {
60		return err
61	}
62	if _, err := h.Write(cm.Chunk.Bytes()); err != nil {
63		return err
64	}
65	return nil
66}
67
68// OverlapsClosedInterval Returns true if the chunk overlaps [mint, maxt].
69func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool {
70	// The chunk itself is a closed interval [cm.MinTime, cm.MaxTime].
71	return cm.MinTime <= maxt && mint <= cm.MaxTime
72}
73
74var (
75	errInvalidSize = fmt.Errorf("invalid size")
76)
77
78var castagnoliTable *crc32.Table
79
80func init() {
81	castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
82}
83
84// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
85// polynomial may be easily changed in one location at a later time, if necessary.
86func newCRC32() hash.Hash32 {
87	return crc32.New(castagnoliTable)
88}
89
90// Writer implements the ChunkWriter interface for the standard
91// serialization format.
92type Writer struct {
93	dirFile *os.File
94	files   []*os.File
95	wbuf    *bufio.Writer
96	n       int64
97	crc32   hash.Hash
98
99	segmentSize int64
100}
101
102const (
103	defaultChunkSegmentSize = 512 * 1024 * 1024
104)
105
106// NewWriter returns a new writer against the given directory.
107func NewWriter(dir string) (*Writer, error) {
108	if err := os.MkdirAll(dir, 0777); err != nil {
109		return nil, err
110	}
111	dirFile, err := fileutil.OpenDir(dir)
112	if err != nil {
113		return nil, err
114	}
115	cw := &Writer{
116		dirFile:     dirFile,
117		n:           0,
118		crc32:       newCRC32(),
119		segmentSize: defaultChunkSegmentSize,
120	}
121	return cw, nil
122}
123
124func (w *Writer) tail() *os.File {
125	if len(w.files) == 0 {
126		return nil
127	}
128	return w.files[len(w.files)-1]
129}
130
131// finalizeTail writes all pending data to the current tail file,
132// truncates its size, and closes it.
133func (w *Writer) finalizeTail() error {
134	tf := w.tail()
135	if tf == nil {
136		return nil
137	}
138
139	if err := w.wbuf.Flush(); err != nil {
140		return err
141	}
142	if err := tf.Sync(); err != nil {
143		return err
144	}
145	// As the file was pre-allocated, we truncate any superfluous zero bytes.
146	off, err := tf.Seek(0, io.SeekCurrent)
147	if err != nil {
148		return err
149	}
150	if err := tf.Truncate(off); err != nil {
151		return err
152	}
153
154	return tf.Close()
155}
156
157func (w *Writer) cut() error {
158	// Sync current tail to disk and close.
159	if err := w.finalizeTail(); err != nil {
160		return err
161	}
162
163	p, _, err := nextSequenceFile(w.dirFile.Name())
164	if err != nil {
165		return err
166	}
167	f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
168	if err != nil {
169		return err
170	}
171	if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
172		return err
173	}
174	if err = w.dirFile.Sync(); err != nil {
175		return err
176	}
177
178	// Write header metadata for new file.
179	metab := make([]byte, 8)
180	binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks)
181	metab[4] = chunksFormatV1
182
183	if _, err := f.Write(metab); err != nil {
184		return err
185	}
186
187	w.files = append(w.files, f)
188	if w.wbuf != nil {
189		w.wbuf.Reset(f)
190	} else {
191		w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
192	}
193	w.n = 8
194
195	return nil
196}
197
198func (w *Writer) write(b []byte) error {
199	n, err := w.wbuf.Write(b)
200	w.n += int64(n)
201	return err
202}
203
204// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
205// The last appearing sample is retained in case there is overlapping.
206// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
207func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
208	if len(chks) < 2 {
209		return chks, nil
210	}
211	newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks.
212	newChks = append(newChks, chks[0])
213	last := 0
214	for _, c := range chks[1:] {
215		// We need to check only the last chunk in newChks.
216		// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
217		//         (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
218		// So never overlaps with newChks[last-1] or anything before that.
219		if c.MinTime > newChks[last].MaxTime {
220			newChks = append(newChks, c)
221			last += 1
222			continue
223		}
224		nc := &newChks[last]
225		if c.MaxTime > nc.MaxTime {
226			nc.MaxTime = c.MaxTime
227		}
228		chk, err := MergeChunks(nc.Chunk, c.Chunk)
229		if err != nil {
230			return nil, err
231		}
232		nc.Chunk = chk
233	}
234
235	return newChks, nil
236}
237
238// MergeChunks vertically merges a and b, i.e., if there is any sample
239// with same timestamp in both a and b, the sample in a is discarded.
240func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
241	newChunk := chunkenc.NewXORChunk()
242	app, err := newChunk.Appender()
243	if err != nil {
244		return nil, err
245	}
246	ait := a.Iterator()
247	bit := b.Iterator()
248	aok, bok := ait.Next(), bit.Next()
249	for aok && bok {
250		at, av := ait.At()
251		bt, bv := bit.At()
252		if at < bt {
253			app.Append(at, av)
254			aok = ait.Next()
255		} else if bt < at {
256			app.Append(bt, bv)
257			bok = bit.Next()
258		} else {
259			app.Append(bt, bv)
260			aok = ait.Next()
261			bok = bit.Next()
262		}
263	}
264	for aok {
265		at, av := ait.At()
266		app.Append(at, av)
267		aok = ait.Next()
268	}
269	for bok {
270		bt, bv := bit.At()
271		app.Append(bt, bv)
272		bok = bit.Next()
273	}
274	if ait.Err() != nil {
275		return nil, ait.Err()
276	}
277	if bit.Err() != nil {
278		return nil, bit.Err()
279	}
280	return newChunk, nil
281}
282
283func (w *Writer) WriteChunks(chks ...Meta) error {
284	// Calculate maximum space we need and cut a new segment in case
285	// we don't fit into the current one.
286	maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
287	for _, c := range chks {
288		maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
289		maxLen += int64(len(c.Chunk.Bytes()))
290		maxLen += 4 // The 4 bytes of crc32
291	}
292	newsz := w.n + maxLen
293
294	if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
295		if err := w.cut(); err != nil {
296			return err
297		}
298	}
299
300	var (
301		b   = [binary.MaxVarintLen32]byte{}
302		seq = uint64(w.seq()) << 32
303	)
304	for i := range chks {
305		chk := &chks[i]
306
307		chk.Ref = seq | uint64(w.n)
308
309		n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes())))
310
311		if err := w.write(b[:n]); err != nil {
312			return err
313		}
314		b[0] = byte(chk.Chunk.Encoding())
315		if err := w.write(b[:1]); err != nil {
316			return err
317		}
318		if err := w.write(chk.Chunk.Bytes()); err != nil {
319			return err
320		}
321
322		w.crc32.Reset()
323		if err := chk.writeHash(w.crc32); err != nil {
324			return err
325		}
326		if err := w.write(w.crc32.Sum(b[:0])); err != nil {
327			return err
328		}
329	}
330
331	return nil
332}
333
334func (w *Writer) seq() int {
335	return len(w.files) - 1
336}
337
338func (w *Writer) Close() error {
339	if err := w.finalizeTail(); err != nil {
340		return err
341	}
342
343	// close dir file (if not windows platform will fail on rename)
344	return w.dirFile.Close()
345}
346
347// ByteSlice abstracts a byte slice.
348type ByteSlice interface {
349	Len() int
350	Range(start, end int) []byte
351}
352
353type realByteSlice []byte
354
355func (b realByteSlice) Len() int {
356	return len(b)
357}
358
359func (b realByteSlice) Range(start, end int) []byte {
360	return b[start:end]
361}
362
363func (b realByteSlice) Sub(start, end int) ByteSlice {
364	return b[start:end]
365}
366
367// Reader implements a SeriesReader for a serialized byte stream
368// of series data.
369type Reader struct {
370	bs   []ByteSlice // The underlying bytes holding the encoded series data.
371	cs   []io.Closer // Closers for resources behind the byte slices.
372	size int64       // The total size of bytes in the reader.
373	pool chunkenc.Pool
374}
375
376func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
377	cr := Reader{pool: pool, bs: bs, cs: cs}
378	var totalSize int64
379
380	for i, b := range cr.bs {
381		if b.Len() < chunkHeaderSize {
382			return nil, errors.Wrapf(errInvalidSize, "invalid chunk header in segment %d", i)
383		}
384		// Verify magic number.
385		if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks {
386			return nil, errors.Errorf("invalid magic number %x", m)
387		}
388
389		// Verify chunk format version.
390		if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
391			return nil, errors.Errorf("invalid chunk format version %d", v)
392		}
393		totalSize += int64(b.Len())
394	}
395	cr.size = totalSize
396	return &cr, nil
397}
398
399// NewDirReader returns a new Reader against sequentially numbered files in the
400// given directory.
401func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
402	files, err := sequenceFiles(dir)
403	if err != nil {
404		return nil, err
405	}
406	if pool == nil {
407		pool = chunkenc.NewPool()
408	}
409
410	var (
411		bs   []ByteSlice
412		cs   []io.Closer
413		merr tsdb_errors.MultiError
414	)
415	for _, fn := range files {
416		f, err := fileutil.OpenMmapFile(fn)
417		if err != nil {
418			merr.Add(errors.Wrap(err, "mmap files"))
419			merr.Add(closeAll(cs))
420			return nil, merr
421		}
422		cs = append(cs, f)
423		bs = append(bs, realByteSlice(f.Bytes()))
424	}
425
426	reader, err := newReader(bs, cs, pool)
427	if err != nil {
428		merr.Add(err)
429		merr.Add(closeAll(cs))
430		return nil, merr
431	}
432	return reader, nil
433}
434
435func (s *Reader) Close() error {
436	return closeAll(s.cs)
437}
438
439// Size returns the size of the chunks.
440func (s *Reader) Size() int64 {
441	return s.size
442}
443
444// Chunk returns a chunk from a given reference.
445func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
446	var (
447		sgmSeq    = int(ref >> 32)
448		sgmOffset = int((ref << 32) >> 32)
449	)
450	if sgmSeq >= len(s.bs) {
451		return nil, errors.Errorf("reference sequence %d out of range", sgmSeq)
452	}
453	chkS := s.bs[sgmSeq]
454
455	if sgmOffset >= chkS.Len() {
456		return nil, errors.Errorf("offset %d beyond data size %d", sgmOffset, chkS.Len())
457	}
458	// With the minimum chunk length this should never cause us reading
459	// over the end of the slice.
460	chk := chkS.Range(sgmOffset, sgmOffset+binary.MaxVarintLen32)
461
462	chkLen, n := binary.Uvarint(chk)
463	if n <= 0 {
464		return nil, errors.Errorf("reading chunk length failed with %d", n)
465	}
466	chk = chkS.Range(sgmOffset+n, sgmOffset+n+1+int(chkLen))
467
468	return s.pool.Get(chunkenc.Encoding(chk[0]), chk[1:1+chkLen])
469}
470
471func nextSequenceFile(dir string) (string, int, error) {
472	names, err := fileutil.ReadDir(dir)
473	if err != nil {
474		return "", 0, err
475	}
476
477	i := uint64(0)
478	for _, n := range names {
479		j, err := strconv.ParseUint(n, 10, 64)
480		if err != nil {
481			continue
482		}
483		i = j
484	}
485	return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
486}
487
488func sequenceFiles(dir string) ([]string, error) {
489	files, err := ioutil.ReadDir(dir)
490	if err != nil {
491		return nil, err
492	}
493	var res []string
494
495	for _, fi := range files {
496		if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
497			continue
498		}
499		res = append(res, filepath.Join(dir, fi.Name()))
500	}
501	return res, nil
502}
503
504func closeAll(cs []io.Closer) (err error) {
505	for _, c := range cs {
506		if e := c.Close(); e != nil {
507			err = e
508		}
509	}
510	return err
511}
512