1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package chunks 15 16import ( 17 "bufio" 18 "encoding/binary" 19 "fmt" 20 "hash" 21 "hash/crc32" 22 "io" 23 "io/ioutil" 24 "os" 25 "path/filepath" 26 "strconv" 27 28 "github.com/pkg/errors" 29 "github.com/prometheus/tsdb/chunkenc" 30 tsdb_errors "github.com/prometheus/tsdb/errors" 31 "github.com/prometheus/tsdb/fileutil" 32) 33 34const ( 35 // MagicChunks is 4 bytes at the head of a series file. 36 MagicChunks = 0x85BD40DD 37 // MagicChunksSize is the size in bytes of MagicChunks. 38 MagicChunksSize = 4 39 40 chunksFormatV1 = 1 41 ChunksFormatVersionSize = 1 42 43 chunkHeaderSize = MagicChunksSize + ChunksFormatVersionSize 44) 45 46// Meta holds information about a chunk of data. 47type Meta struct { 48 // Ref and Chunk hold either a reference that can be used to retrieve 49 // chunk data or the data itself. 50 // Generally, only one of them is set. 51 Ref uint64 52 Chunk chunkenc.Chunk 53 54 MinTime, MaxTime int64 // time range the data covers 55} 56 57// writeHash writes the chunk encoding and raw data into the provided hash. 58func (cm *Meta) writeHash(h hash.Hash) error { 59 if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil { 60 return err 61 } 62 if _, err := h.Write(cm.Chunk.Bytes()); err != nil { 63 return err 64 } 65 return nil 66} 67 68// OverlapsClosedInterval Returns true if the chunk overlaps [mint, maxt]. 69func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool { 70 // The chunk itself is a closed interval [cm.MinTime, cm.MaxTime]. 71 return cm.MinTime <= maxt && mint <= cm.MaxTime 72} 73 74var ( 75 errInvalidSize = fmt.Errorf("invalid size") 76) 77 78var castagnoliTable *crc32.Table 79 80func init() { 81 castagnoliTable = crc32.MakeTable(crc32.Castagnoli) 82} 83 84// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the 85// polynomial may be easily changed in one location at a later time, if necessary. 86func newCRC32() hash.Hash32 { 87 return crc32.New(castagnoliTable) 88} 89 90// Writer implements the ChunkWriter interface for the standard 91// serialization format. 92type Writer struct { 93 dirFile *os.File 94 files []*os.File 95 wbuf *bufio.Writer 96 n int64 97 crc32 hash.Hash 98 99 segmentSize int64 100} 101 102const ( 103 defaultChunkSegmentSize = 512 * 1024 * 1024 104) 105 106// NewWriter returns a new writer against the given directory. 107func NewWriter(dir string) (*Writer, error) { 108 if err := os.MkdirAll(dir, 0777); err != nil { 109 return nil, err 110 } 111 dirFile, err := fileutil.OpenDir(dir) 112 if err != nil { 113 return nil, err 114 } 115 cw := &Writer{ 116 dirFile: dirFile, 117 n: 0, 118 crc32: newCRC32(), 119 segmentSize: defaultChunkSegmentSize, 120 } 121 return cw, nil 122} 123 124func (w *Writer) tail() *os.File { 125 if len(w.files) == 0 { 126 return nil 127 } 128 return w.files[len(w.files)-1] 129} 130 131// finalizeTail writes all pending data to the current tail file, 132// truncates its size, and closes it. 133func (w *Writer) finalizeTail() error { 134 tf := w.tail() 135 if tf == nil { 136 return nil 137 } 138 139 if err := w.wbuf.Flush(); err != nil { 140 return err 141 } 142 if err := tf.Sync(); err != nil { 143 return err 144 } 145 // As the file was pre-allocated, we truncate any superfluous zero bytes. 146 off, err := tf.Seek(0, io.SeekCurrent) 147 if err != nil { 148 return err 149 } 150 if err := tf.Truncate(off); err != nil { 151 return err 152 } 153 154 return tf.Close() 155} 156 157func (w *Writer) cut() error { 158 // Sync current tail to disk and close. 159 if err := w.finalizeTail(); err != nil { 160 return err 161 } 162 163 p, _, err := nextSequenceFile(w.dirFile.Name()) 164 if err != nil { 165 return err 166 } 167 f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666) 168 if err != nil { 169 return err 170 } 171 if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { 172 return err 173 } 174 if err = w.dirFile.Sync(); err != nil { 175 return err 176 } 177 178 // Write header metadata for new file. 179 metab := make([]byte, 8) 180 binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks) 181 metab[4] = chunksFormatV1 182 183 if _, err := f.Write(metab); err != nil { 184 return err 185 } 186 187 w.files = append(w.files, f) 188 if w.wbuf != nil { 189 w.wbuf.Reset(f) 190 } else { 191 w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) 192 } 193 w.n = 8 194 195 return nil 196} 197 198func (w *Writer) write(b []byte) error { 199 n, err := w.wbuf.Write(b) 200 w.n += int64(n) 201 return err 202} 203 204// MergeOverlappingChunks removes the samples whose timestamp is overlapping. 205// The last appearing sample is retained in case there is overlapping. 206// This assumes that `chks []Meta` is sorted w.r.t. MinTime. 207func MergeOverlappingChunks(chks []Meta) ([]Meta, error) { 208 if len(chks) < 2 { 209 return chks, nil 210 } 211 newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks. 212 newChks = append(newChks, chks[0]) 213 last := 0 214 for _, c := range chks[1:] { 215 // We need to check only the last chunk in newChks. 216 // Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping) 217 // (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime. 218 // So never overlaps with newChks[last-1] or anything before that. 219 if c.MinTime > newChks[last].MaxTime { 220 newChks = append(newChks, c) 221 last += 1 222 continue 223 } 224 nc := &newChks[last] 225 if c.MaxTime > nc.MaxTime { 226 nc.MaxTime = c.MaxTime 227 } 228 chk, err := MergeChunks(nc.Chunk, c.Chunk) 229 if err != nil { 230 return nil, err 231 } 232 nc.Chunk = chk 233 } 234 235 return newChks, nil 236} 237 238// MergeChunks vertically merges a and b, i.e., if there is any sample 239// with same timestamp in both a and b, the sample in a is discarded. 240func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) { 241 newChunk := chunkenc.NewXORChunk() 242 app, err := newChunk.Appender() 243 if err != nil { 244 return nil, err 245 } 246 ait := a.Iterator() 247 bit := b.Iterator() 248 aok, bok := ait.Next(), bit.Next() 249 for aok && bok { 250 at, av := ait.At() 251 bt, bv := bit.At() 252 if at < bt { 253 app.Append(at, av) 254 aok = ait.Next() 255 } else if bt < at { 256 app.Append(bt, bv) 257 bok = bit.Next() 258 } else { 259 app.Append(bt, bv) 260 aok = ait.Next() 261 bok = bit.Next() 262 } 263 } 264 for aok { 265 at, av := ait.At() 266 app.Append(at, av) 267 aok = ait.Next() 268 } 269 for bok { 270 bt, bv := bit.At() 271 app.Append(bt, bv) 272 bok = bit.Next() 273 } 274 if ait.Err() != nil { 275 return nil, ait.Err() 276 } 277 if bit.Err() != nil { 278 return nil, bit.Err() 279 } 280 return newChunk, nil 281} 282 283func (w *Writer) WriteChunks(chks ...Meta) error { 284 // Calculate maximum space we need and cut a new segment in case 285 // we don't fit into the current one. 286 maxLen := int64(binary.MaxVarintLen32) // The number of chunks. 287 for _, c := range chks { 288 maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding. 289 maxLen += int64(len(c.Chunk.Bytes())) 290 maxLen += 4 // The 4 bytes of crc32 291 } 292 newsz := w.n + maxLen 293 294 if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize { 295 if err := w.cut(); err != nil { 296 return err 297 } 298 } 299 300 var ( 301 b = [binary.MaxVarintLen32]byte{} 302 seq = uint64(w.seq()) << 32 303 ) 304 for i := range chks { 305 chk := &chks[i] 306 307 chk.Ref = seq | uint64(w.n) 308 309 n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) 310 311 if err := w.write(b[:n]); err != nil { 312 return err 313 } 314 b[0] = byte(chk.Chunk.Encoding()) 315 if err := w.write(b[:1]); err != nil { 316 return err 317 } 318 if err := w.write(chk.Chunk.Bytes()); err != nil { 319 return err 320 } 321 322 w.crc32.Reset() 323 if err := chk.writeHash(w.crc32); err != nil { 324 return err 325 } 326 if err := w.write(w.crc32.Sum(b[:0])); err != nil { 327 return err 328 } 329 } 330 331 return nil 332} 333 334func (w *Writer) seq() int { 335 return len(w.files) - 1 336} 337 338func (w *Writer) Close() error { 339 if err := w.finalizeTail(); err != nil { 340 return err 341 } 342 343 // close dir file (if not windows platform will fail on rename) 344 return w.dirFile.Close() 345} 346 347// ByteSlice abstracts a byte slice. 348type ByteSlice interface { 349 Len() int 350 Range(start, end int) []byte 351} 352 353type realByteSlice []byte 354 355func (b realByteSlice) Len() int { 356 return len(b) 357} 358 359func (b realByteSlice) Range(start, end int) []byte { 360 return b[start:end] 361} 362 363func (b realByteSlice) Sub(start, end int) ByteSlice { 364 return b[start:end] 365} 366 367// Reader implements a SeriesReader for a serialized byte stream 368// of series data. 369type Reader struct { 370 bs []ByteSlice // The underlying bytes holding the encoded series data. 371 cs []io.Closer // Closers for resources behind the byte slices. 372 size int64 // The total size of bytes in the reader. 373 pool chunkenc.Pool 374} 375 376func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { 377 cr := Reader{pool: pool, bs: bs, cs: cs} 378 var totalSize int64 379 380 for i, b := range cr.bs { 381 if b.Len() < chunkHeaderSize { 382 return nil, errors.Wrapf(errInvalidSize, "invalid chunk header in segment %d", i) 383 } 384 // Verify magic number. 385 if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks { 386 return nil, errors.Errorf("invalid magic number %x", m) 387 } 388 389 // Verify chunk format version. 390 if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { 391 return nil, errors.Errorf("invalid chunk format version %d", v) 392 } 393 totalSize += int64(b.Len()) 394 } 395 cr.size = totalSize 396 return &cr, nil 397} 398 399// NewDirReader returns a new Reader against sequentially numbered files in the 400// given directory. 401func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { 402 files, err := sequenceFiles(dir) 403 if err != nil { 404 return nil, err 405 } 406 if pool == nil { 407 pool = chunkenc.NewPool() 408 } 409 410 var ( 411 bs []ByteSlice 412 cs []io.Closer 413 merr tsdb_errors.MultiError 414 ) 415 for _, fn := range files { 416 f, err := fileutil.OpenMmapFile(fn) 417 if err != nil { 418 merr.Add(errors.Wrap(err, "mmap files")) 419 merr.Add(closeAll(cs)) 420 return nil, merr 421 } 422 cs = append(cs, f) 423 bs = append(bs, realByteSlice(f.Bytes())) 424 } 425 426 reader, err := newReader(bs, cs, pool) 427 if err != nil { 428 merr.Add(err) 429 merr.Add(closeAll(cs)) 430 return nil, merr 431 } 432 return reader, nil 433} 434 435func (s *Reader) Close() error { 436 return closeAll(s.cs) 437} 438 439// Size returns the size of the chunks. 440func (s *Reader) Size() int64 { 441 return s.size 442} 443 444// Chunk returns a chunk from a given reference. 445func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { 446 var ( 447 sgmSeq = int(ref >> 32) 448 sgmOffset = int((ref << 32) >> 32) 449 ) 450 if sgmSeq >= len(s.bs) { 451 return nil, errors.Errorf("reference sequence %d out of range", sgmSeq) 452 } 453 chkS := s.bs[sgmSeq] 454 455 if sgmOffset >= chkS.Len() { 456 return nil, errors.Errorf("offset %d beyond data size %d", sgmOffset, chkS.Len()) 457 } 458 // With the minimum chunk length this should never cause us reading 459 // over the end of the slice. 460 chk := chkS.Range(sgmOffset, sgmOffset+binary.MaxVarintLen32) 461 462 chkLen, n := binary.Uvarint(chk) 463 if n <= 0 { 464 return nil, errors.Errorf("reading chunk length failed with %d", n) 465 } 466 chk = chkS.Range(sgmOffset+n, sgmOffset+n+1+int(chkLen)) 467 468 return s.pool.Get(chunkenc.Encoding(chk[0]), chk[1:1+chkLen]) 469} 470 471func nextSequenceFile(dir string) (string, int, error) { 472 names, err := fileutil.ReadDir(dir) 473 if err != nil { 474 return "", 0, err 475 } 476 477 i := uint64(0) 478 for _, n := range names { 479 j, err := strconv.ParseUint(n, 10, 64) 480 if err != nil { 481 continue 482 } 483 i = j 484 } 485 return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil 486} 487 488func sequenceFiles(dir string) ([]string, error) { 489 files, err := ioutil.ReadDir(dir) 490 if err != nil { 491 return nil, err 492 } 493 var res []string 494 495 for _, fi := range files { 496 if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil { 497 continue 498 } 499 res = append(res, filepath.Join(dir, fi.Name())) 500 } 501 return res, nil 502} 503 504func closeAll(cs []io.Closer) (err error) { 505 for _, c := range cs { 506 if e := c.Close(); e != nil { 507 err = e 508 } 509 } 510 return err 511} 512