1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package chunks
15
16import (
17	"bufio"
18	"bytes"
19	"encoding/binary"
20	"fmt"
21	"hash"
22	"hash/crc32"
23	"io"
24	"io/ioutil"
25	"os"
26	"path/filepath"
27	"strconv"
28
29	"github.com/pkg/errors"
30
31	"github.com/prometheus/prometheus/tsdb/chunkenc"
32	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
33	"github.com/prometheus/prometheus/tsdb/fileutil"
34)
35
36// Segment header fields constants.
37const (
38	// MagicChunks is 4 bytes at the head of a series file.
39	MagicChunks = 0x85BD40DD
40	// MagicChunksSize is the size in bytes of MagicChunks.
41	MagicChunksSize          = 4
42	chunksFormatV1           = 1
43	ChunksFormatVersionSize  = 1
44	segmentHeaderPaddingSize = 3
45	// SegmentHeaderSize defines the total size of the header part.
46	SegmentHeaderSize = MagicChunksSize + ChunksFormatVersionSize + segmentHeaderPaddingSize
47)
48
49// Chunk fields constants.
50const (
51	// MaxChunkLengthFieldSize defines the maximum size of the data length part.
52	MaxChunkLengthFieldSize = binary.MaxVarintLen32
53	// ChunkEncodingSize defines the size of the chunk encoding part.
54	ChunkEncodingSize = 1
55)
56
57// Meta holds information about a chunk of data.
58type Meta struct {
59	// Ref and Chunk hold either a reference that can be used to retrieve
60	// chunk data or the data itself.
61	// When it is a reference it is the segment offset at which the chunk bytes start.
62	// Generally, only one of them is set.
63	Ref   uint64
64	Chunk chunkenc.Chunk
65
66	// Time range the data covers.
67	// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
68	MinTime, MaxTime int64
69}
70
71// Iterator iterates over the chunk of a time series.
72type Iterator interface {
73	// At returns the current meta.
74	// It depends on implementation if the chunk is populated or not.
75	At() Meta
76	// Next advances the iterator by one.
77	Next() bool
78	// Err returns optional error if Next is false.
79	Err() error
80}
81
82// writeHash writes the chunk encoding and raw data into the provided hash.
83func (cm *Meta) writeHash(h hash.Hash, buf []byte) error {
84	buf = append(buf[:0], byte(cm.Chunk.Encoding()))
85	if _, err := h.Write(buf[:1]); err != nil {
86		return err
87	}
88	if _, err := h.Write(cm.Chunk.Bytes()); err != nil {
89		return err
90	}
91	return nil
92}
93
94// OverlapsClosedInterval Returns true if the chunk overlaps [mint, maxt].
95func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool {
96	// The chunk itself is a closed interval [cm.MinTime, cm.MaxTime].
97	return cm.MinTime <= maxt && mint <= cm.MaxTime
98}
99
100var (
101	errInvalidSize = fmt.Errorf("invalid size")
102)
103
104var castagnoliTable *crc32.Table
105
106func init() {
107	castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
108}
109
110// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
111// polynomial may be easily changed in one location at a later time, if necessary.
112func newCRC32() hash.Hash32 {
113	return crc32.New(castagnoliTable)
114}
115
116// Writer implements the ChunkWriter interface for the standard
117// serialization format.
118type Writer struct {
119	dirFile *os.File
120	files   []*os.File
121	wbuf    *bufio.Writer
122	n       int64
123	crc32   hash.Hash
124	buf     [binary.MaxVarintLen32]byte
125
126	segmentSize int64
127}
128
129const (
130	// DefaultChunkSegmentSize is the default chunks segment size.
131	DefaultChunkSegmentSize = 512 * 1024 * 1024
132)
133
134// NewWriterWithSegSize returns a new writer against the given directory
135// and allows setting a custom size for the segments.
136func NewWriterWithSegSize(dir string, segmentSize int64) (*Writer, error) {
137	return newWriter(dir, segmentSize)
138}
139
140// NewWriter returns a new writer against the given directory
141// using the default segment size.
142func NewWriter(dir string) (*Writer, error) {
143	return newWriter(dir, DefaultChunkSegmentSize)
144}
145
146func newWriter(dir string, segmentSize int64) (*Writer, error) {
147	if segmentSize <= 0 {
148		segmentSize = DefaultChunkSegmentSize
149	}
150
151	if err := os.MkdirAll(dir, 0777); err != nil {
152		return nil, err
153	}
154	dirFile, err := fileutil.OpenDir(dir)
155	if err != nil {
156		return nil, err
157	}
158	return &Writer{
159		dirFile:     dirFile,
160		n:           0,
161		crc32:       newCRC32(),
162		segmentSize: segmentSize,
163	}, nil
164}
165
166func (w *Writer) tail() *os.File {
167	if len(w.files) == 0 {
168		return nil
169	}
170	return w.files[len(w.files)-1]
171}
172
173// finalizeTail writes all pending data to the current tail file,
174// truncates its size, and closes it.
175func (w *Writer) finalizeTail() error {
176	tf := w.tail()
177	if tf == nil {
178		return nil
179	}
180
181	if err := w.wbuf.Flush(); err != nil {
182		return err
183	}
184	if err := tf.Sync(); err != nil {
185		return err
186	}
187	// As the file was pre-allocated, we truncate any superfluous zero bytes.
188	off, err := tf.Seek(0, io.SeekCurrent)
189	if err != nil {
190		return err
191	}
192	if err := tf.Truncate(off); err != nil {
193		return err
194	}
195
196	return tf.Close()
197}
198
199func (w *Writer) cut() error {
200	// Sync current tail to disk and close.
201	if err := w.finalizeTail(); err != nil {
202		return err
203	}
204
205	n, f, _, err := cutSegmentFile(w.dirFile, MagicChunks, chunksFormatV1, w.segmentSize)
206	if err != nil {
207		return err
208	}
209	w.n = int64(n)
210
211	w.files = append(w.files, f)
212	if w.wbuf != nil {
213		w.wbuf.Reset(f)
214	} else {
215		w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
216	}
217
218	return nil
219}
220
221func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, allocSize int64) (headerSize int, newFile *os.File, seq int, returnErr error) {
222	p, seq, err := nextSequenceFile(dirFile.Name())
223	if err != nil {
224		return 0, nil, 0, errors.Wrap(err, "next sequence file")
225	}
226	ptmp := p + ".tmp"
227	f, err := os.OpenFile(ptmp, os.O_WRONLY|os.O_CREATE, 0666)
228	if err != nil {
229		return 0, nil, 0, errors.Wrap(err, "open temp file")
230	}
231	defer func() {
232		if returnErr != nil {
233			errs := tsdb_errors.NewMulti(returnErr)
234			if f != nil {
235				errs.Add(f.Close())
236			}
237			// Calling RemoveAll on a non-existent file does not return error.
238			errs.Add(os.RemoveAll(ptmp))
239			returnErr = errs.Err()
240		}
241	}()
242	if allocSize > 0 {
243		if err = fileutil.Preallocate(f, allocSize, true); err != nil {
244			return 0, nil, 0, errors.Wrap(err, "preallocate")
245		}
246	}
247	if err = dirFile.Sync(); err != nil {
248		return 0, nil, 0, errors.Wrap(err, "sync directory")
249	}
250
251	// Write header metadata for new file.
252	metab := make([]byte, SegmentHeaderSize)
253	binary.BigEndian.PutUint32(metab[:MagicChunksSize], magicNumber)
254	metab[4] = chunksFormat
255
256	n, err := f.Write(metab)
257	if err != nil {
258		return 0, nil, 0, errors.Wrap(err, "write header")
259	}
260	if err := f.Close(); err != nil {
261		return 0, nil, 0, errors.Wrap(err, "close temp file")
262	}
263	f = nil
264
265	if err := fileutil.Rename(ptmp, p); err != nil {
266		return 0, nil, 0, errors.Wrap(err, "replace file")
267	}
268
269	f, err = os.OpenFile(p, os.O_WRONLY, 0666)
270	if err != nil {
271		return 0, nil, 0, errors.Wrap(err, "open final file")
272	}
273	// Skip header for further writes.
274	if _, err := f.Seek(int64(n), 0); err != nil {
275		return 0, nil, 0, errors.Wrap(err, "seek in final file")
276	}
277	return n, f, seq, nil
278}
279
280func (w *Writer) write(b []byte) error {
281	n, err := w.wbuf.Write(b)
282	w.n += int64(n)
283	return err
284}
285
286// WriteChunks writes as many chunks as possible to the current segment,
287// cuts a new segment when the current segment is full and
288// writes the rest of the chunks in the new segment.
289func (w *Writer) WriteChunks(chks ...Meta) error {
290	var (
291		batchSize  = int64(0)
292		batchStart = 0
293		batches    = make([][]Meta, 1)
294		batchID    = 0
295		firstBatch = true
296	)
297
298	for i, chk := range chks {
299		// Each chunk contains: data length + encoding + the data itself + crc32
300		chkSize := int64(MaxChunkLengthFieldSize) // The data length is a variable length field so use the maximum possible value.
301		chkSize += ChunkEncodingSize              // The chunk encoding.
302		chkSize += int64(len(chk.Chunk.Bytes()))  // The data itself.
303		chkSize += crc32.Size                     // The 4 bytes of crc32.
304		batchSize += chkSize
305
306		// Cut a new batch when it is not the first chunk(to avoid empty segments) and
307		// the batch is too large to fit in the current segment.
308		cutNewBatch := (i != 0) && (batchSize+SegmentHeaderSize > w.segmentSize)
309
310		// When the segment already has some data than
311		// the first batch size calculation should account for that.
312		if firstBatch && w.n > SegmentHeaderSize {
313			cutNewBatch = batchSize+w.n > w.segmentSize
314			if cutNewBatch {
315				firstBatch = false
316			}
317		}
318
319		if cutNewBatch {
320			batchStart = i
321			batches = append(batches, []Meta{})
322			batchID++
323			batchSize = chkSize
324		}
325		batches[batchID] = chks[batchStart : i+1]
326	}
327
328	// Create a new segment when one doesn't already exist.
329	if w.n == 0 {
330		if err := w.cut(); err != nil {
331			return err
332		}
333	}
334
335	for i, chks := range batches {
336		if err := w.writeChunks(chks); err != nil {
337			return err
338		}
339		// Cut a new segment only when there are more chunks to write.
340		// Avoid creating a new empty segment at the end of the write.
341		if i < len(batches)-1 {
342			if err := w.cut(); err != nil {
343				return err
344			}
345		}
346	}
347	return nil
348}
349
350// writeChunks writes the chunks into the current segment irrespective
351// of the configured segment size limit. A segment should have been already
352// started before calling this.
353func (w *Writer) writeChunks(chks []Meta) error {
354	if len(chks) == 0 {
355		return nil
356	}
357
358	var seq = uint64(w.seq()) << 32
359	for i := range chks {
360		chk := &chks[i]
361
362		// The reference is set to the segment index and the offset where
363		// the data starts for this chunk.
364		//
365		// The upper 4 bytes are for the segment index and
366		// the lower 4 bytes are for the segment offset where to start reading this chunk.
367		chk.Ref = seq | uint64(w.n)
368
369		n := binary.PutUvarint(w.buf[:], uint64(len(chk.Chunk.Bytes())))
370
371		if err := w.write(w.buf[:n]); err != nil {
372			return err
373		}
374		w.buf[0] = byte(chk.Chunk.Encoding())
375		if err := w.write(w.buf[:1]); err != nil {
376			return err
377		}
378		if err := w.write(chk.Chunk.Bytes()); err != nil {
379			return err
380		}
381
382		w.crc32.Reset()
383		if err := chk.writeHash(w.crc32, w.buf[:]); err != nil {
384			return err
385		}
386		if err := w.write(w.crc32.Sum(w.buf[:0])); err != nil {
387			return err
388		}
389	}
390	return nil
391}
392
393func (w *Writer) seq() int {
394	return len(w.files) - 1
395}
396
397func (w *Writer) Close() error {
398	if err := w.finalizeTail(); err != nil {
399		return err
400	}
401
402	// close dir file (if not windows platform will fail on rename)
403	return w.dirFile.Close()
404}
405
406// ByteSlice abstracts a byte slice.
407type ByteSlice interface {
408	Len() int
409	Range(start, end int) []byte
410}
411
412type realByteSlice []byte
413
414func (b realByteSlice) Len() int {
415	return len(b)
416}
417
418func (b realByteSlice) Range(start, end int) []byte {
419	return b[start:end]
420}
421
422// Reader implements a ChunkReader for a serialized byte stream
423// of series data.
424type Reader struct {
425	// The underlying bytes holding the encoded series data.
426	// Each slice holds the data for a different segment.
427	bs   []ByteSlice
428	cs   []io.Closer // Closers for resources behind the byte slices.
429	size int64       // The total size of bytes in the reader.
430	pool chunkenc.Pool
431}
432
433func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
434	cr := Reader{pool: pool, bs: bs, cs: cs}
435	for i, b := range cr.bs {
436		if b.Len() < SegmentHeaderSize {
437			return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i)
438		}
439		// Verify magic number.
440		if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks {
441			return nil, errors.Errorf("invalid magic number %x", m)
442		}
443
444		// Verify chunk format version.
445		if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
446			return nil, errors.Errorf("invalid chunk format version %d", v)
447		}
448		cr.size += int64(b.Len())
449	}
450	return &cr, nil
451}
452
453// NewDirReader returns a new Reader against sequentially numbered files in the
454// given directory.
455func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
456	files, err := sequenceFiles(dir)
457	if err != nil {
458		return nil, err
459	}
460	if pool == nil {
461		pool = chunkenc.NewPool()
462	}
463
464	var (
465		bs []ByteSlice
466		cs []io.Closer
467	)
468	for _, fn := range files {
469		f, err := fileutil.OpenMmapFile(fn)
470		if err != nil {
471			return nil, tsdb_errors.NewMulti(
472				errors.Wrap(err, "mmap files"),
473				tsdb_errors.CloseAll(cs),
474			).Err()
475		}
476		cs = append(cs, f)
477		bs = append(bs, realByteSlice(f.Bytes()))
478	}
479
480	reader, err := newReader(bs, cs, pool)
481	if err != nil {
482		return nil, tsdb_errors.NewMulti(
483			err,
484			tsdb_errors.CloseAll(cs),
485		).Err()
486	}
487	return reader, nil
488}
489
490func (s *Reader) Close() error {
491	return tsdb_errors.CloseAll(s.cs)
492}
493
494// Size returns the size of the chunks.
495func (s *Reader) Size() int64 {
496	return s.size
497}
498
499// Chunk returns a chunk from a given reference.
500func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
501	var (
502		// Get the upper 4 bytes.
503		// These contain the segment index.
504		sgmIndex = int(ref >> 32)
505		// Get the lower 4 bytes.
506		// These contain the segment offset where the data for this chunk starts.
507		chkStart = int((ref << 32) >> 32)
508		chkCRC32 = newCRC32()
509	)
510
511	if sgmIndex >= len(s.bs) {
512		return nil, errors.Errorf("segment index %d out of range", sgmIndex)
513	}
514
515	sgmBytes := s.bs[sgmIndex]
516
517	if chkStart+MaxChunkLengthFieldSize > sgmBytes.Len() {
518		return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len())
519	}
520	// With the minimum chunk length this should never cause us reading
521	// over the end of the slice.
522	c := sgmBytes.Range(chkStart, chkStart+MaxChunkLengthFieldSize)
523	chkDataLen, n := binary.Uvarint(c)
524	if n <= 0 {
525		return nil, errors.Errorf("reading chunk length failed with %d", n)
526	}
527
528	chkEncStart := chkStart + n
529	chkEnd := chkEncStart + ChunkEncodingSize + int(chkDataLen) + crc32.Size
530	chkDataStart := chkEncStart + ChunkEncodingSize
531	chkDataEnd := chkEnd - crc32.Size
532
533	if chkEnd > sgmBytes.Len() {
534		return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len())
535	}
536
537	sum := sgmBytes.Range(chkDataEnd, chkEnd)
538	if _, err := chkCRC32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil {
539		return nil, err
540	}
541
542	if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
543		return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act)
544	}
545
546	chkData := sgmBytes.Range(chkDataStart, chkDataEnd)
547	chkEnc := sgmBytes.Range(chkEncStart, chkEncStart+ChunkEncodingSize)[0]
548	return s.pool.Get(chunkenc.Encoding(chkEnc), chkData)
549}
550
551func nextSequenceFile(dir string) (string, int, error) {
552	files, err := ioutil.ReadDir(dir)
553	if err != nil {
554		return "", 0, err
555	}
556
557	i := uint64(0)
558	for _, f := range files {
559		j, err := strconv.ParseUint(f.Name(), 10, 64)
560		if err != nil {
561			continue
562		}
563		// It is not necessary that we find the files in number order,
564		// for example with '1000000' and '200000', '1000000' would come first.
565		// Though this is a very very race case, we check anyway for the max id.
566		if j > i {
567			i = j
568		}
569	}
570	return segmentFile(dir, int(i+1)), int(i + 1), nil
571}
572
573func segmentFile(baseDir string, index int) string {
574	return filepath.Join(baseDir, fmt.Sprintf("%0.6d", index))
575}
576
577func sequenceFiles(dir string) ([]string, error) {
578	files, err := ioutil.ReadDir(dir)
579	if err != nil {
580		return nil, err
581	}
582	var res []string
583	for _, fi := range files {
584		if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
585			continue
586		}
587		res = append(res, filepath.Join(dir, fi.Name()))
588	}
589	return res, nil
590}
591