1// Copyright 2017 The Prometheus Authors 2 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package tsdb 16 17import ( 18 "encoding/json" 19 "io" 20 "io/ioutil" 21 "os" 22 "path/filepath" 23 "sync" 24 25 "github.com/go-kit/kit/log" 26 "github.com/go-kit/kit/log/level" 27 "github.com/oklog/ulid" 28 "github.com/pkg/errors" 29 "github.com/prometheus/tsdb/chunkenc" 30 "github.com/prometheus/tsdb/chunks" 31 tsdb_errors "github.com/prometheus/tsdb/errors" 32 "github.com/prometheus/tsdb/index" 33 "github.com/prometheus/tsdb/labels" 34) 35 36// IndexWriter serializes the index for a block of series data. 37// The methods must be called in the order they are specified in. 38type IndexWriter interface { 39 // AddSymbols registers all string symbols that are encountered in series 40 // and other indices. 41 AddSymbols(sym map[string]struct{}) error 42 43 // AddSeries populates the index writer with a series and its offsets 44 // of chunks that the index can reference. 45 // Implementations may require series to be insert in increasing order by 46 // their labels. 47 // The reference numbers are used to resolve entries in postings lists that 48 // are added later. 49 AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error 50 51 // WriteLabelIndex serializes an index from label names to values. 52 // The passed in values chained tuples of strings of the length of names. 53 WriteLabelIndex(names []string, values []string) error 54 55 // WritePostings writes a postings list for a single label pair. 56 // The Postings here contain refs to the series that were added. 57 WritePostings(name, value string, it index.Postings) error 58 59 // Close writes any finalization and closes the resources associated with 60 // the underlying writer. 61 Close() error 62} 63 64// IndexReader provides reading access of serialized index data. 65type IndexReader interface { 66 // Symbols returns a set of string symbols that may occur in series' labels 67 // and indices. 68 Symbols() (map[string]struct{}, error) 69 70 // LabelValues returns the possible label values. 71 LabelValues(names ...string) (index.StringTuples, error) 72 73 // Postings returns the postings list iterator for the label pair. 74 // The Postings here contain the offsets to the series inside the index. 75 // Found IDs are not strictly required to point to a valid Series, e.g. during 76 // background garbage collections. 77 Postings(name, value string) (index.Postings, error) 78 79 // SortedPostings returns a postings list that is reordered to be sorted 80 // by the label set of the underlying series. 81 SortedPostings(index.Postings) index.Postings 82 83 // Series populates the given labels and chunk metas for the series identified 84 // by the reference. 85 // Returns ErrNotFound if the ref does not resolve to a known series. 86 Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error 87 88 // LabelIndices returns a list of string tuples for which a label value index exists. 89 // NOTE: This is deprecated. Use `LabelNames()` instead. 90 LabelIndices() ([][]string, error) 91 92 // LabelNames returns all the unique label names present in the index in sorted order. 93 LabelNames() ([]string, error) 94 95 // Close releases the underlying resources of the reader. 96 Close() error 97} 98 99// StringTuples provides access to a sorted list of string tuples. 100type StringTuples interface { 101 // Total number of tuples in the list. 102 Len() int 103 // At returns the tuple at position i. 104 At(i int) ([]string, error) 105} 106 107// ChunkWriter serializes a time block of chunked series data. 108type ChunkWriter interface { 109 // WriteChunks writes several chunks. The Chunk field of the ChunkMetas 110 // must be populated. 111 // After returning successfully, the Ref fields in the ChunkMetas 112 // are set and can be used to retrieve the chunks from the written data. 113 WriteChunks(chunks ...chunks.Meta) error 114 115 // Close writes any required finalization and closes the resources 116 // associated with the underlying writer. 117 Close() error 118} 119 120// ChunkReader provides reading access of serialized time series data. 121type ChunkReader interface { 122 // Chunk returns the series data chunk with the given reference. 123 Chunk(ref uint64) (chunkenc.Chunk, error) 124 125 // Close releases all underlying resources of the reader. 126 Close() error 127} 128 129// BlockReader provides reading access to a data block. 130type BlockReader interface { 131 // Index returns an IndexReader over the block's data. 132 Index() (IndexReader, error) 133 134 // Chunks returns a ChunkReader over the block's data. 135 Chunks() (ChunkReader, error) 136 137 // Tombstones returns a TombstoneReader over the block's deleted data. 138 Tombstones() (TombstoneReader, error) 139 140 // MinTime returns the min time of the block. 141 MinTime() int64 142 143 // MaxTime returns the max time of the block. 144 MaxTime() int64 145} 146 147// Appendable defines an entity to which data can be appended. 148type Appendable interface { 149 // Appender returns a new Appender against an underlying store. 150 Appender() Appender 151} 152 153// SizeReader returns the size of the object in bytes. 154type SizeReader interface { 155 // Size returns the size in bytes. 156 Size() int64 157} 158 159// BlockMeta provides meta information about a block. 160type BlockMeta struct { 161 // Unique identifier for the block and its contents. Changes on compaction. 162 ULID ulid.ULID `json:"ulid"` 163 164 // MinTime and MaxTime specify the time range all samples 165 // in the block are in. 166 MinTime int64 `json:"minTime"` 167 MaxTime int64 `json:"maxTime"` 168 169 // Stats about the contents of the block. 170 Stats BlockStats `json:"stats,omitempty"` 171 172 // Information on compactions the block was created from. 173 Compaction BlockMetaCompaction `json:"compaction"` 174 175 // Version of the index format. 176 Version int `json:"version"` 177} 178 179// BlockStats contains stats about contents of a block. 180type BlockStats struct { 181 NumSamples uint64 `json:"numSamples,omitempty"` 182 NumSeries uint64 `json:"numSeries,omitempty"` 183 NumChunks uint64 `json:"numChunks,omitempty"` 184 NumTombstones uint64 `json:"numTombstones,omitempty"` 185 NumBytes int64 `json:"numBytes,omitempty"` 186} 187 188// BlockDesc describes a block by ULID and time range. 189type BlockDesc struct { 190 ULID ulid.ULID `json:"ulid"` 191 MinTime int64 `json:"minTime"` 192 MaxTime int64 `json:"maxTime"` 193} 194 195// BlockMetaCompaction holds information about compactions a block went through. 196type BlockMetaCompaction struct { 197 // Maximum number of compaction cycles any source block has 198 // gone through. 199 Level int `json:"level"` 200 // ULIDs of all source head blocks that went into the block. 201 Sources []ulid.ULID `json:"sources,omitempty"` 202 // Indicates that during compaction it resulted in a block without any samples 203 // so it should be deleted on the next reload. 204 Deletable bool `json:"deletable,omitempty"` 205 // Short descriptions of the direct blocks that were used to create 206 // this block. 207 Parents []BlockDesc `json:"parents,omitempty"` 208 Failed bool `json:"failed,omitempty"` 209} 210 211const indexFilename = "index" 212const metaFilename = "meta.json" 213 214func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } 215 216func readMetaFile(dir string) (*BlockMeta, error) { 217 b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) 218 if err != nil { 219 return nil, err 220 } 221 var m BlockMeta 222 223 if err := json.Unmarshal(b, &m); err != nil { 224 return nil, err 225 } 226 if m.Version != 1 { 227 return nil, errors.Errorf("unexpected meta file version %d", m.Version) 228 } 229 230 return &m, nil 231} 232 233func writeMetaFile(dir string, meta *BlockMeta) error { 234 meta.Version = 1 235 236 // Make any changes to the file appear atomic. 237 path := filepath.Join(dir, metaFilename) 238 tmp := path + ".tmp" 239 240 f, err := os.Create(tmp) 241 if err != nil { 242 return err 243 } 244 245 enc := json.NewEncoder(f) 246 enc.SetIndent("", "\t") 247 248 var merr tsdb_errors.MultiError 249 250 if merr.Add(enc.Encode(meta)); merr.Err() != nil { 251 merr.Add(f.Close()) 252 return merr.Err() 253 } 254 // Force the kernel to persist the file on disk to avoid data loss if the host crashes. 255 if merr.Add(f.Sync()); merr.Err() != nil { 256 merr.Add(f.Close()) 257 return merr.Err() 258 } 259 if err := f.Close(); err != nil { 260 return err 261 } 262 return renameFile(tmp, path) 263} 264 265// Block represents a directory of time series data covering a continuous time range. 266type Block struct { 267 mtx sync.RWMutex 268 closing bool 269 pendingReaders sync.WaitGroup 270 271 dir string 272 meta BlockMeta 273 274 // Symbol Table Size in bytes. 275 // We maintain this variable to avoid recalculation everytime. 276 symbolTableSize uint64 277 278 chunkr ChunkReader 279 indexr IndexReader 280 tombstones TombstoneReader 281} 282 283// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used 284// to instantiate chunk structs. 285func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { 286 if logger == nil { 287 logger = log.NewNopLogger() 288 } 289 var closers []io.Closer 290 defer func() { 291 if err != nil { 292 var merr tsdb_errors.MultiError 293 merr.Add(err) 294 merr.Add(closeAll(closers)) 295 err = merr.Err() 296 } 297 }() 298 meta, err := readMetaFile(dir) 299 if err != nil { 300 return nil, err 301 } 302 303 cr, err := chunks.NewDirReader(chunkDir(dir), pool) 304 if err != nil { 305 return nil, err 306 } 307 closers = append(closers, cr) 308 309 ir, err := index.NewFileReader(filepath.Join(dir, indexFilename)) 310 if err != nil { 311 return nil, err 312 } 313 closers = append(closers, ir) 314 315 tr, tsr, err := readTombstones(dir) 316 if err != nil { 317 return nil, err 318 } 319 closers = append(closers, tr) 320 321 // TODO refactor to set this at block creation time as 322 // that would be the logical place for a block size to be calculated. 323 bs := blockSize(cr, ir, tsr) 324 meta.Stats.NumBytes = bs 325 err = writeMetaFile(dir, meta) 326 if err != nil { 327 level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) 328 } 329 330 pb = &Block{ 331 dir: dir, 332 meta: *meta, 333 chunkr: cr, 334 indexr: ir, 335 tombstones: tr, 336 symbolTableSize: ir.SymbolTableSize(), 337 } 338 return pb, nil 339} 340 341func blockSize(rr ...SizeReader) int64 { 342 var total int64 343 for _, r := range rr { 344 if r != nil { 345 total += r.Size() 346 } 347 } 348 return total 349} 350 351// Close closes the on-disk block. It blocks as long as there are readers reading from the block. 352func (pb *Block) Close() error { 353 pb.mtx.Lock() 354 pb.closing = true 355 pb.mtx.Unlock() 356 357 pb.pendingReaders.Wait() 358 359 var merr tsdb_errors.MultiError 360 361 merr.Add(pb.chunkr.Close()) 362 merr.Add(pb.indexr.Close()) 363 merr.Add(pb.tombstones.Close()) 364 365 return merr.Err() 366} 367 368func (pb *Block) String() string { 369 return pb.meta.ULID.String() 370} 371 372// Dir returns the directory of the block. 373func (pb *Block) Dir() string { return pb.dir } 374 375// Meta returns meta information about the block. 376func (pb *Block) Meta() BlockMeta { return pb.meta } 377 378// MinTime returns the min time of the meta. 379func (pb *Block) MinTime() int64 { return pb.meta.MinTime } 380 381// MaxTime returns the max time of the meta. 382func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime } 383 384// Size returns the number of bytes that the block takes up. 385func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } 386 387// ErrClosing is returned when a block is in the process of being closed. 388var ErrClosing = errors.New("block is closing") 389 390func (pb *Block) startRead() error { 391 pb.mtx.RLock() 392 defer pb.mtx.RUnlock() 393 394 if pb.closing { 395 return ErrClosing 396 } 397 pb.pendingReaders.Add(1) 398 return nil 399} 400 401// Index returns a new IndexReader against the block data. 402func (pb *Block) Index() (IndexReader, error) { 403 if err := pb.startRead(); err != nil { 404 return nil, err 405 } 406 return blockIndexReader{ir: pb.indexr, b: pb}, nil 407} 408 409// Chunks returns a new ChunkReader against the block data. 410func (pb *Block) Chunks() (ChunkReader, error) { 411 if err := pb.startRead(); err != nil { 412 return nil, err 413 } 414 return blockChunkReader{ChunkReader: pb.chunkr, b: pb}, nil 415} 416 417// Tombstones returns a new TombstoneReader against the block data. 418func (pb *Block) Tombstones() (TombstoneReader, error) { 419 if err := pb.startRead(); err != nil { 420 return nil, err 421 } 422 return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil 423} 424 425// GetSymbolTableSize returns the Symbol Table Size in the index of this block. 426func (pb *Block) GetSymbolTableSize() uint64 { 427 return pb.symbolTableSize 428} 429 430func (pb *Block) setCompactionFailed() error { 431 pb.meta.Compaction.Failed = true 432 return writeMetaFile(pb.dir, &pb.meta) 433} 434 435type blockIndexReader struct { 436 ir IndexReader 437 b *Block 438} 439 440func (r blockIndexReader) Symbols() (map[string]struct{}, error) { 441 s, err := r.ir.Symbols() 442 return s, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) 443} 444 445func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, error) { 446 st, err := r.ir.LabelValues(names...) 447 return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) 448} 449 450func (r blockIndexReader) Postings(name, value string) (index.Postings, error) { 451 p, err := r.ir.Postings(name, value) 452 return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) 453} 454 455func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { 456 return r.ir.SortedPostings(p) 457} 458 459func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { 460 return errors.Wrapf( 461 r.ir.Series(ref, lset, chks), 462 "block: %s", 463 r.b.Meta().ULID, 464 ) 465} 466 467func (r blockIndexReader) LabelIndices() ([][]string, error) { 468 ss, err := r.ir.LabelIndices() 469 return ss, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) 470} 471 472func (r blockIndexReader) LabelNames() ([]string, error) { 473 return r.b.LabelNames() 474} 475 476func (r blockIndexReader) Close() error { 477 r.b.pendingReaders.Done() 478 return nil 479} 480 481type blockTombstoneReader struct { 482 TombstoneReader 483 b *Block 484} 485 486func (r blockTombstoneReader) Close() error { 487 r.b.pendingReaders.Done() 488 return nil 489} 490 491type blockChunkReader struct { 492 ChunkReader 493 b *Block 494} 495 496func (r blockChunkReader) Close() error { 497 r.b.pendingReaders.Done() 498 return nil 499} 500 501// Delete matching series between mint and maxt in the block. 502func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { 503 pb.mtx.Lock() 504 defer pb.mtx.Unlock() 505 506 if pb.closing { 507 return ErrClosing 508 } 509 510 p, err := PostingsForMatchers(pb.indexr, ms...) 511 if err != nil { 512 return errors.Wrap(err, "select series") 513 } 514 515 ir := pb.indexr 516 517 // Choose only valid postings which have chunks in the time-range. 518 stones := newMemTombstones() 519 520 var lset labels.Labels 521 var chks []chunks.Meta 522 523Outer: 524 for p.Next() { 525 err := ir.Series(p.At(), &lset, &chks) 526 if err != nil { 527 return err 528 } 529 530 for _, chk := range chks { 531 if chk.OverlapsClosedInterval(mint, maxt) { 532 // Delete only until the current values and not beyond. 533 tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) 534 stones.addInterval(p.At(), Interval{tmin, tmax}) 535 continue Outer 536 } 537 } 538 } 539 540 if p.Err() != nil { 541 return p.Err() 542 } 543 544 err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { 545 for _, iv := range ivs { 546 stones.addInterval(id, iv) 547 } 548 return nil 549 }) 550 if err != nil { 551 return err 552 } 553 pb.tombstones = stones 554 pb.meta.Stats.NumTombstones = pb.tombstones.Total() 555 556 if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { 557 return err 558 } 559 return writeMetaFile(pb.dir, &pb.meta) 560} 561 562// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). 563// If there was a rewrite, then it returns the ULID of the new block written, else nil. 564func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { 565 numStones := 0 566 567 if err := pb.tombstones.Iter(func(id uint64, ivs Intervals) error { 568 numStones += len(ivs) 569 return nil 570 }); err != nil { 571 // This should never happen, as the iteration function only returns nil. 572 panic(err) 573 } 574 if numStones == 0 { 575 return nil, nil 576 } 577 578 meta := pb.Meta() 579 uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta) 580 if err != nil { 581 return nil, err 582 } 583 return &uid, nil 584} 585 586// Snapshot creates snapshot of the block into dir. 587func (pb *Block) Snapshot(dir string) error { 588 blockDir := filepath.Join(dir, pb.meta.ULID.String()) 589 if err := os.MkdirAll(blockDir, 0777); err != nil { 590 return errors.Wrap(err, "create snapshot block dir") 591 } 592 593 chunksDir := chunkDir(blockDir) 594 if err := os.MkdirAll(chunksDir, 0777); err != nil { 595 return errors.Wrap(err, "create snapshot chunk dir") 596 } 597 598 // Hardlink meta, index and tombstones 599 for _, fname := range []string{ 600 metaFilename, 601 indexFilename, 602 tombstoneFilename, 603 } { 604 if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil { 605 return errors.Wrapf(err, "create snapshot %s", fname) 606 } 607 } 608 609 // Hardlink the chunks 610 curChunkDir := chunkDir(pb.dir) 611 files, err := ioutil.ReadDir(curChunkDir) 612 if err != nil { 613 return errors.Wrap(err, "ReadDir the current chunk dir") 614 } 615 616 for _, f := range files { 617 err := os.Link(filepath.Join(curChunkDir, f.Name()), filepath.Join(chunksDir, f.Name())) 618 if err != nil { 619 return errors.Wrap(err, "hardlink a chunk") 620 } 621 } 622 623 return nil 624} 625 626// OverlapsClosedInterval returns true if the block overlaps [mint, maxt]. 627func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool { 628 // The block itself is a half-open interval 629 // [pb.meta.MinTime, pb.meta.MaxTime). 630 return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime 631} 632 633// LabelNames returns all the unique label names present in the Block in sorted order. 634func (pb *Block) LabelNames() ([]string, error) { 635 return pb.indexr.LabelNames() 636} 637 638func clampInterval(a, b, mint, maxt int64) (int64, int64) { 639 if a < mint { 640 a = mint 641 } 642 if b > maxt { 643 b = maxt 644 } 645 return a, b 646} 647