1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package chunks 15 16import ( 17 "bufio" 18 "bytes" 19 "encoding/binary" 20 "fmt" 21 "hash" 22 "hash/crc32" 23 "io" 24 "io/ioutil" 25 "os" 26 "path/filepath" 27 "strconv" 28 29 "github.com/pkg/errors" 30 31 "github.com/prometheus/prometheus/tsdb/chunkenc" 32 tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" 33 "github.com/prometheus/prometheus/tsdb/fileutil" 34) 35 36// Segment header fields constants. 37const ( 38 // MagicChunks is 4 bytes at the head of a series file. 39 MagicChunks = 0x85BD40DD 40 // MagicChunksSize is the size in bytes of MagicChunks. 41 MagicChunksSize = 4 42 chunksFormatV1 = 1 43 ChunksFormatVersionSize = 1 44 segmentHeaderPaddingSize = 3 45 // SegmentHeaderSize defines the total size of the header part. 46 SegmentHeaderSize = MagicChunksSize + ChunksFormatVersionSize + segmentHeaderPaddingSize 47) 48 49// Chunk fields constants. 50const ( 51 // MaxChunkLengthFieldSize defines the maximum size of the data length part. 52 MaxChunkLengthFieldSize = binary.MaxVarintLen32 53 // ChunkEncodingSize defines the size of the chunk encoding part. 54 ChunkEncodingSize = 1 55) 56 57// Meta holds information about a chunk of data. 58type Meta struct { 59 // Ref and Chunk hold either a reference that can be used to retrieve 60 // chunk data or the data itself. 61 // When it is a reference it is the segment offset at which the chunk bytes start. 62 // Generally, only one of them is set. 63 Ref uint64 64 Chunk chunkenc.Chunk 65 66 // Time range the data covers. 67 // When MaxTime == math.MaxInt64 the chunk is still open and being appended to. 68 MinTime, MaxTime int64 69} 70 71// Iterator iterates over the chunk of a time series. 72type Iterator interface { 73 // At returns the current meta. 74 // It depends on implementation if the chunk is populated or not. 75 At() Meta 76 // Next advances the iterator by one. 77 Next() bool 78 // Err returns optional error if Next is false. 79 Err() error 80} 81 82// writeHash writes the chunk encoding and raw data into the provided hash. 83func (cm *Meta) writeHash(h hash.Hash, buf []byte) error { 84 buf = append(buf[:0], byte(cm.Chunk.Encoding())) 85 if _, err := h.Write(buf[:1]); err != nil { 86 return err 87 } 88 if _, err := h.Write(cm.Chunk.Bytes()); err != nil { 89 return err 90 } 91 return nil 92} 93 94// OverlapsClosedInterval Returns true if the chunk overlaps [mint, maxt]. 95func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool { 96 // The chunk itself is a closed interval [cm.MinTime, cm.MaxTime]. 97 return cm.MinTime <= maxt && mint <= cm.MaxTime 98} 99 100var ( 101 errInvalidSize = fmt.Errorf("invalid size") 102) 103 104var castagnoliTable *crc32.Table 105 106func init() { 107 castagnoliTable = crc32.MakeTable(crc32.Castagnoli) 108} 109 110// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the 111// polynomial may be easily changed in one location at a later time, if necessary. 112func newCRC32() hash.Hash32 { 113 return crc32.New(castagnoliTable) 114} 115 116// Writer implements the ChunkWriter interface for the standard 117// serialization format. 118type Writer struct { 119 dirFile *os.File 120 files []*os.File 121 wbuf *bufio.Writer 122 n int64 123 crc32 hash.Hash 124 buf [binary.MaxVarintLen32]byte 125 126 segmentSize int64 127} 128 129const ( 130 // DefaultChunkSegmentSize is the default chunks segment size. 131 DefaultChunkSegmentSize = 512 * 1024 * 1024 132) 133 134// NewWriterWithSegSize returns a new writer against the given directory 135// and allows setting a custom size for the segments. 136func NewWriterWithSegSize(dir string, segmentSize int64) (*Writer, error) { 137 return newWriter(dir, segmentSize) 138} 139 140// NewWriter returns a new writer against the given directory 141// using the default segment size. 142func NewWriter(dir string) (*Writer, error) { 143 return newWriter(dir, DefaultChunkSegmentSize) 144} 145 146func newWriter(dir string, segmentSize int64) (*Writer, error) { 147 if segmentSize <= 0 { 148 segmentSize = DefaultChunkSegmentSize 149 } 150 151 if err := os.MkdirAll(dir, 0777); err != nil { 152 return nil, err 153 } 154 dirFile, err := fileutil.OpenDir(dir) 155 if err != nil { 156 return nil, err 157 } 158 return &Writer{ 159 dirFile: dirFile, 160 n: 0, 161 crc32: newCRC32(), 162 segmentSize: segmentSize, 163 }, nil 164} 165 166func (w *Writer) tail() *os.File { 167 if len(w.files) == 0 { 168 return nil 169 } 170 return w.files[len(w.files)-1] 171} 172 173// finalizeTail writes all pending data to the current tail file, 174// truncates its size, and closes it. 175func (w *Writer) finalizeTail() error { 176 tf := w.tail() 177 if tf == nil { 178 return nil 179 } 180 181 if err := w.wbuf.Flush(); err != nil { 182 return err 183 } 184 if err := tf.Sync(); err != nil { 185 return err 186 } 187 // As the file was pre-allocated, we truncate any superfluous zero bytes. 188 off, err := tf.Seek(0, io.SeekCurrent) 189 if err != nil { 190 return err 191 } 192 if err := tf.Truncate(off); err != nil { 193 return err 194 } 195 196 return tf.Close() 197} 198 199func (w *Writer) cut() error { 200 // Sync current tail to disk and close. 201 if err := w.finalizeTail(); err != nil { 202 return err 203 } 204 205 n, f, _, err := cutSegmentFile(w.dirFile, MagicChunks, chunksFormatV1, w.segmentSize) 206 if err != nil { 207 return err 208 } 209 w.n = int64(n) 210 211 w.files = append(w.files, f) 212 if w.wbuf != nil { 213 w.wbuf.Reset(f) 214 } else { 215 w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) 216 } 217 218 return nil 219} 220 221func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, allocSize int64) (headerSize int, newFile *os.File, seq int, returnErr error) { 222 p, seq, err := nextSequenceFile(dirFile.Name()) 223 if err != nil { 224 return 0, nil, 0, errors.Wrap(err, "next sequence file") 225 } 226 ptmp := p + ".tmp" 227 f, err := os.OpenFile(ptmp, os.O_WRONLY|os.O_CREATE, 0666) 228 if err != nil { 229 return 0, nil, 0, errors.Wrap(err, "open temp file") 230 } 231 defer func() { 232 if returnErr != nil { 233 errs := tsdb_errors.NewMulti(returnErr) 234 if f != nil { 235 errs.Add(f.Close()) 236 } 237 // Calling RemoveAll on a non-existent file does not return error. 238 errs.Add(os.RemoveAll(ptmp)) 239 returnErr = errs.Err() 240 } 241 }() 242 if allocSize > 0 { 243 if err = fileutil.Preallocate(f, allocSize, true); err != nil { 244 return 0, nil, 0, errors.Wrap(err, "preallocate") 245 } 246 } 247 if err = dirFile.Sync(); err != nil { 248 return 0, nil, 0, errors.Wrap(err, "sync directory") 249 } 250 251 // Write header metadata for new file. 252 metab := make([]byte, SegmentHeaderSize) 253 binary.BigEndian.PutUint32(metab[:MagicChunksSize], magicNumber) 254 metab[4] = chunksFormat 255 256 n, err := f.Write(metab) 257 if err != nil { 258 return 0, nil, 0, errors.Wrap(err, "write header") 259 } 260 if err := f.Close(); err != nil { 261 return 0, nil, 0, errors.Wrap(err, "close temp file") 262 } 263 f = nil 264 265 if err := fileutil.Rename(ptmp, p); err != nil { 266 return 0, nil, 0, errors.Wrap(err, "replace file") 267 } 268 269 f, err = os.OpenFile(p, os.O_WRONLY, 0666) 270 if err != nil { 271 return 0, nil, 0, errors.Wrap(err, "open final file") 272 } 273 // Skip header for further writes. 274 if _, err := f.Seek(int64(n), 0); err != nil { 275 return 0, nil, 0, errors.Wrap(err, "seek in final file") 276 } 277 return n, f, seq, nil 278} 279 280func (w *Writer) write(b []byte) error { 281 n, err := w.wbuf.Write(b) 282 w.n += int64(n) 283 return err 284} 285 286// WriteChunks writes as many chunks as possible to the current segment, 287// cuts a new segment when the current segment is full and 288// writes the rest of the chunks in the new segment. 289func (w *Writer) WriteChunks(chks ...Meta) error { 290 var ( 291 batchSize = int64(0) 292 batchStart = 0 293 batches = make([][]Meta, 1) 294 batchID = 0 295 firstBatch = true 296 ) 297 298 for i, chk := range chks { 299 // Each chunk contains: data length + encoding + the data itself + crc32 300 chkSize := int64(MaxChunkLengthFieldSize) // The data length is a variable length field so use the maximum possible value. 301 chkSize += ChunkEncodingSize // The chunk encoding. 302 chkSize += int64(len(chk.Chunk.Bytes())) // The data itself. 303 chkSize += crc32.Size // The 4 bytes of crc32. 304 batchSize += chkSize 305 306 // Cut a new batch when it is not the first chunk(to avoid empty segments) and 307 // the batch is too large to fit in the current segment. 308 cutNewBatch := (i != 0) && (batchSize+SegmentHeaderSize > w.segmentSize) 309 310 // When the segment already has some data than 311 // the first batch size calculation should account for that. 312 if firstBatch && w.n > SegmentHeaderSize { 313 cutNewBatch = batchSize+w.n > w.segmentSize 314 if cutNewBatch { 315 firstBatch = false 316 } 317 } 318 319 if cutNewBatch { 320 batchStart = i 321 batches = append(batches, []Meta{}) 322 batchID++ 323 batchSize = chkSize 324 } 325 batches[batchID] = chks[batchStart : i+1] 326 } 327 328 // Create a new segment when one doesn't already exist. 329 if w.n == 0 { 330 if err := w.cut(); err != nil { 331 return err 332 } 333 } 334 335 for i, chks := range batches { 336 if err := w.writeChunks(chks); err != nil { 337 return err 338 } 339 // Cut a new segment only when there are more chunks to write. 340 // Avoid creating a new empty segment at the end of the write. 341 if i < len(batches)-1 { 342 if err := w.cut(); err != nil { 343 return err 344 } 345 } 346 } 347 return nil 348} 349 350// writeChunks writes the chunks into the current segment irrespective 351// of the configured segment size limit. A segment should have been already 352// started before calling this. 353func (w *Writer) writeChunks(chks []Meta) error { 354 if len(chks) == 0 { 355 return nil 356 } 357 358 var seq = uint64(w.seq()) << 32 359 for i := range chks { 360 chk := &chks[i] 361 362 // The reference is set to the segment index and the offset where 363 // the data starts for this chunk. 364 // 365 // The upper 4 bytes are for the segment index and 366 // the lower 4 bytes are for the segment offset where to start reading this chunk. 367 chk.Ref = seq | uint64(w.n) 368 369 n := binary.PutUvarint(w.buf[:], uint64(len(chk.Chunk.Bytes()))) 370 371 if err := w.write(w.buf[:n]); err != nil { 372 return err 373 } 374 w.buf[0] = byte(chk.Chunk.Encoding()) 375 if err := w.write(w.buf[:1]); err != nil { 376 return err 377 } 378 if err := w.write(chk.Chunk.Bytes()); err != nil { 379 return err 380 } 381 382 w.crc32.Reset() 383 if err := chk.writeHash(w.crc32, w.buf[:]); err != nil { 384 return err 385 } 386 if err := w.write(w.crc32.Sum(w.buf[:0])); err != nil { 387 return err 388 } 389 } 390 return nil 391} 392 393func (w *Writer) seq() int { 394 return len(w.files) - 1 395} 396 397func (w *Writer) Close() error { 398 if err := w.finalizeTail(); err != nil { 399 return err 400 } 401 402 // close dir file (if not windows platform will fail on rename) 403 return w.dirFile.Close() 404} 405 406// ByteSlice abstracts a byte slice. 407type ByteSlice interface { 408 Len() int 409 Range(start, end int) []byte 410} 411 412type realByteSlice []byte 413 414func (b realByteSlice) Len() int { 415 return len(b) 416} 417 418func (b realByteSlice) Range(start, end int) []byte { 419 return b[start:end] 420} 421 422// Reader implements a ChunkReader for a serialized byte stream 423// of series data. 424type Reader struct { 425 // The underlying bytes holding the encoded series data. 426 // Each slice holds the data for a different segment. 427 bs []ByteSlice 428 cs []io.Closer // Closers for resources behind the byte slices. 429 size int64 // The total size of bytes in the reader. 430 pool chunkenc.Pool 431} 432 433func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { 434 cr := Reader{pool: pool, bs: bs, cs: cs} 435 for i, b := range cr.bs { 436 if b.Len() < SegmentHeaderSize { 437 return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i) 438 } 439 // Verify magic number. 440 if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks { 441 return nil, errors.Errorf("invalid magic number %x", m) 442 } 443 444 // Verify chunk format version. 445 if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { 446 return nil, errors.Errorf("invalid chunk format version %d", v) 447 } 448 cr.size += int64(b.Len()) 449 } 450 return &cr, nil 451} 452 453// NewDirReader returns a new Reader against sequentially numbered files in the 454// given directory. 455func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { 456 files, err := sequenceFiles(dir) 457 if err != nil { 458 return nil, err 459 } 460 if pool == nil { 461 pool = chunkenc.NewPool() 462 } 463 464 var ( 465 bs []ByteSlice 466 cs []io.Closer 467 ) 468 for _, fn := range files { 469 f, err := fileutil.OpenMmapFile(fn) 470 if err != nil { 471 return nil, tsdb_errors.NewMulti( 472 errors.Wrap(err, "mmap files"), 473 tsdb_errors.CloseAll(cs), 474 ).Err() 475 } 476 cs = append(cs, f) 477 bs = append(bs, realByteSlice(f.Bytes())) 478 } 479 480 reader, err := newReader(bs, cs, pool) 481 if err != nil { 482 return nil, tsdb_errors.NewMulti( 483 err, 484 tsdb_errors.CloseAll(cs), 485 ).Err() 486 } 487 return reader, nil 488} 489 490func (s *Reader) Close() error { 491 return tsdb_errors.CloseAll(s.cs) 492} 493 494// Size returns the size of the chunks. 495func (s *Reader) Size() int64 { 496 return s.size 497} 498 499// Chunk returns a chunk from a given reference. 500func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { 501 var ( 502 // Get the upper 4 bytes. 503 // These contain the segment index. 504 sgmIndex = int(ref >> 32) 505 // Get the lower 4 bytes. 506 // These contain the segment offset where the data for this chunk starts. 507 chkStart = int((ref << 32) >> 32) 508 chkCRC32 = newCRC32() 509 ) 510 511 if sgmIndex >= len(s.bs) { 512 return nil, errors.Errorf("segment index %d out of range", sgmIndex) 513 } 514 515 sgmBytes := s.bs[sgmIndex] 516 517 if chkStart+MaxChunkLengthFieldSize > sgmBytes.Len() { 518 return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, sgmBytes.Len()) 519 } 520 // With the minimum chunk length this should never cause us reading 521 // over the end of the slice. 522 c := sgmBytes.Range(chkStart, chkStart+MaxChunkLengthFieldSize) 523 chkDataLen, n := binary.Uvarint(c) 524 if n <= 0 { 525 return nil, errors.Errorf("reading chunk length failed with %d", n) 526 } 527 528 chkEncStart := chkStart + n 529 chkEnd := chkEncStart + ChunkEncodingSize + int(chkDataLen) + crc32.Size 530 chkDataStart := chkEncStart + ChunkEncodingSize 531 chkDataEnd := chkEnd - crc32.Size 532 533 if chkEnd > sgmBytes.Len() { 534 return nil, errors.Errorf("segment doesn't include enough bytes to read the chunk - required:%v, available:%v", chkEnd, sgmBytes.Len()) 535 } 536 537 sum := sgmBytes.Range(chkDataEnd, chkEnd) 538 if _, err := chkCRC32.Write(sgmBytes.Range(chkEncStart, chkDataEnd)); err != nil { 539 return nil, err 540 } 541 542 if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { 543 return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act) 544 } 545 546 chkData := sgmBytes.Range(chkDataStart, chkDataEnd) 547 chkEnc := sgmBytes.Range(chkEncStart, chkEncStart+ChunkEncodingSize)[0] 548 return s.pool.Get(chunkenc.Encoding(chkEnc), chkData) 549} 550 551func nextSequenceFile(dir string) (string, int, error) { 552 files, err := ioutil.ReadDir(dir) 553 if err != nil { 554 return "", 0, err 555 } 556 557 i := uint64(0) 558 for _, f := range files { 559 j, err := strconv.ParseUint(f.Name(), 10, 64) 560 if err != nil { 561 continue 562 } 563 // It is not necessary that we find the files in number order, 564 // for example with '1000000' and '200000', '1000000' would come first. 565 // Though this is a very very race case, we check anyway for the max id. 566 if j > i { 567 i = j 568 } 569 } 570 return segmentFile(dir, int(i+1)), int(i + 1), nil 571} 572 573func segmentFile(baseDir string, index int) string { 574 return filepath.Join(baseDir, fmt.Sprintf("%0.6d", index)) 575} 576 577func sequenceFiles(dir string) ([]string, error) { 578 files, err := ioutil.ReadDir(dir) 579 if err != nil { 580 return nil, err 581 } 582 var res []string 583 for _, fi := range files { 584 if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil { 585 continue 586 } 587 res = append(res, filepath.Join(dir, fi.Name())) 588 } 589 return res, nil 590} 591