1// Copyright 2017 The Prometheus Authors 2 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package tsdb 16 17import ( 18 "encoding/json" 19 "io/ioutil" 20 "os" 21 "path/filepath" 22 "sync" 23 24 "github.com/oklog/ulid" 25 "github.com/pkg/errors" 26 "github.com/prometheus/tsdb/chunkenc" 27 "github.com/prometheus/tsdb/chunks" 28 "github.com/prometheus/tsdb/index" 29 "github.com/prometheus/tsdb/labels" 30) 31 32// IndexWriter serializes the index for a block of series data. 33// The methods must be called in the order they are specified in. 34type IndexWriter interface { 35 // AddSymbols registers all string symbols that are encountered in series 36 // and other indices. 37 AddSymbols(sym map[string]struct{}) error 38 39 // AddSeries populates the index writer with a series and its offsets 40 // of chunks that the index can reference. 41 // Implementations may require series to be insert in increasing order by 42 // their labels. 43 // The reference numbers are used to resolve entries in postings lists that 44 // are added later. 45 AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error 46 47 // WriteLabelIndex serializes an index from label names to values. 48 // The passed in values chained tuples of strings of the length of names. 49 WriteLabelIndex(names []string, values []string) error 50 51 // WritePostings writes a postings list for a single label pair. 52 // The Postings here contain refs to the series that were added. 53 WritePostings(name, value string, it index.Postings) error 54 55 // Close writes any finalization and closes the resources associated with 56 // the underlying writer. 57 Close() error 58} 59 60// IndexReader provides reading access of serialized index data. 61type IndexReader interface { 62 // Symbols returns a set of string symbols that may occur in series' labels 63 // and indices. 64 Symbols() (map[string]struct{}, error) 65 66 // LabelValues returns the possible label values. 67 LabelValues(names ...string) (index.StringTuples, error) 68 69 // Postings returns the postings list iterator for the label pair. 70 // The Postings here contain the offsets to the series inside the index. 71 // Found IDs are not strictly required to point to a valid Series, e.g. during 72 // background garbage collections. 73 Postings(name, value string) (index.Postings, error) 74 75 // SortedPostings returns a postings list that is reordered to be sorted 76 // by the label set of the underlying series. 77 SortedPostings(index.Postings) index.Postings 78 79 // Series populates the given labels and chunk metas for the series identified 80 // by the reference. 81 // Returns ErrNotFound if the ref does not resolve to a known series. 82 Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error 83 84 // LabelIndices returns a list of string tuples for which a label value index exists. 85 // NOTE: This is deprecated. Use `LabelNames()` instead. 86 LabelIndices() ([][]string, error) 87 88 // LabelNames returns all the unique label names present in the index in sorted order. 89 LabelNames() ([]string, error) 90 91 // Close releases the underlying resources of the reader. 92 Close() error 93} 94 95// StringTuples provides access to a sorted list of string tuples. 96type StringTuples interface { 97 // Total number of tuples in the list. 98 Len() int 99 // At returns the tuple at position i. 100 At(i int) ([]string, error) 101} 102 103// ChunkWriter serializes a time block of chunked series data. 104type ChunkWriter interface { 105 // WriteChunks writes several chunks. The Chunk field of the ChunkMetas 106 // must be populated. 107 // After returning successfully, the Ref fields in the ChunkMetas 108 // are set and can be used to retrieve the chunks from the written data. 109 WriteChunks(chunks ...chunks.Meta) error 110 111 // Close writes any required finalization and closes the resources 112 // associated with the underlying writer. 113 Close() error 114} 115 116// ChunkReader provides reading access of serialized time series data. 117type ChunkReader interface { 118 // Chunk returns the series data chunk with the given reference. 119 Chunk(ref uint64) (chunkenc.Chunk, error) 120 121 // Close releases all underlying resources of the reader. 122 Close() error 123} 124 125// BlockReader provides reading access to a data block. 126type BlockReader interface { 127 // Index returns an IndexReader over the block's data. 128 Index() (IndexReader, error) 129 130 // Chunks returns a ChunkReader over the block's data. 131 Chunks() (ChunkReader, error) 132 133 // Tombstones returns a TombstoneReader over the block's deleted data. 134 Tombstones() (TombstoneReader, error) 135} 136 137// Appendable defines an entity to which data can be appended. 138type Appendable interface { 139 // Appender returns a new Appender against an underlying store. 140 Appender() Appender 141} 142 143// BlockMeta provides meta information about a block. 144type BlockMeta struct { 145 // Unique identifier for the block and its contents. Changes on compaction. 146 ULID ulid.ULID `json:"ulid"` 147 148 // MinTime and MaxTime specify the time range all samples 149 // in the block are in. 150 MinTime int64 `json:"minTime"` 151 MaxTime int64 `json:"maxTime"` 152 153 // Stats about the contents of the block. 154 Stats BlockStats `json:"stats,omitempty"` 155 156 // Information on compactions the block was created from. 157 Compaction BlockMetaCompaction `json:"compaction"` 158 159 // Version of the index format. 160 Version int `json:"version"` 161} 162 163// BlockStats contains stats about contents of a block. 164type BlockStats struct { 165 NumSamples uint64 `json:"numSamples,omitempty"` 166 NumSeries uint64 `json:"numSeries,omitempty"` 167 NumChunks uint64 `json:"numChunks,omitempty"` 168 NumTombstones uint64 `json:"numTombstones,omitempty"` 169} 170 171// BlockDesc describes a block by ULID and time range. 172type BlockDesc struct { 173 ULID ulid.ULID `json:"ulid"` 174 MinTime int64 `json:"minTime"` 175 MaxTime int64 `json:"maxTime"` 176} 177 178// BlockMetaCompaction holds information about compactions a block went through. 179type BlockMetaCompaction struct { 180 // Maximum number of compaction cycles any source block has 181 // gone through. 182 Level int `json:"level"` 183 // ULIDs of all source head blocks that went into the block. 184 Sources []ulid.ULID `json:"sources,omitempty"` 185 // Short descriptions of the direct blocks that were used to create 186 // this block. 187 Parents []BlockDesc `json:"parents,omitempty"` 188 Failed bool `json:"failed,omitempty"` 189} 190 191const indexFilename = "index" 192const metaFilename = "meta.json" 193 194func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } 195 196func readMetaFile(dir string) (*BlockMeta, error) { 197 b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) 198 if err != nil { 199 return nil, err 200 } 201 var m BlockMeta 202 203 if err := json.Unmarshal(b, &m); err != nil { 204 return nil, err 205 } 206 if m.Version != 1 { 207 return nil, errors.Errorf("unexpected meta file version %d", m.Version) 208 } 209 210 return &m, nil 211} 212 213func writeMetaFile(dir string, meta *BlockMeta) error { 214 meta.Version = 1 215 216 // Make any changes to the file appear atomic. 217 path := filepath.Join(dir, metaFilename) 218 tmp := path + ".tmp" 219 220 f, err := os.Create(tmp) 221 if err != nil { 222 return err 223 } 224 225 enc := json.NewEncoder(f) 226 enc.SetIndent("", "\t") 227 228 var merr MultiError 229 230 if merr.Add(enc.Encode(meta)); merr.Err() != nil { 231 merr.Add(f.Close()) 232 return merr.Err() 233 } 234 if err := f.Close(); err != nil { 235 return err 236 } 237 return renameFile(tmp, path) 238} 239 240// Block represents a directory of time series data covering a continuous time range. 241type Block struct { 242 mtx sync.RWMutex 243 closing bool 244 pendingReaders sync.WaitGroup 245 246 dir string 247 meta BlockMeta 248 249 // Symbol Table Size in bytes. 250 // We maintain this variable to avoid recalculation everytime. 251 symbolTableSize uint64 252 253 chunkr ChunkReader 254 indexr IndexReader 255 tombstones TombstoneReader 256} 257 258// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used 259// to instantiate chunk structs. 260func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { 261 meta, err := readMetaFile(dir) 262 if err != nil { 263 return nil, err 264 } 265 266 cr, err := chunks.NewDirReader(chunkDir(dir), pool) 267 if err != nil { 268 return nil, err 269 } 270 ir, err := index.NewFileReader(filepath.Join(dir, "index")) 271 if err != nil { 272 return nil, err 273 } 274 275 tr, err := readTombstones(dir) 276 if err != nil { 277 return nil, err 278 } 279 280 pb := &Block{ 281 dir: dir, 282 meta: *meta, 283 chunkr: cr, 284 indexr: ir, 285 tombstones: tr, 286 symbolTableSize: ir.SymbolTableSize(), 287 } 288 return pb, nil 289} 290 291// Close closes the on-disk block. It blocks as long as there are readers reading from the block. 292func (pb *Block) Close() error { 293 pb.mtx.Lock() 294 pb.closing = true 295 pb.mtx.Unlock() 296 297 pb.pendingReaders.Wait() 298 299 var merr MultiError 300 301 merr.Add(pb.chunkr.Close()) 302 merr.Add(pb.indexr.Close()) 303 merr.Add(pb.tombstones.Close()) 304 305 return merr.Err() 306} 307 308func (pb *Block) String() string { 309 return pb.meta.ULID.String() 310} 311 312// Dir returns the directory of the block. 313func (pb *Block) Dir() string { return pb.dir } 314 315// Meta returns meta information about the block. 316func (pb *Block) Meta() BlockMeta { return pb.meta } 317 318// ErrClosing is returned when a block is in the process of being closed. 319var ErrClosing = errors.New("block is closing") 320 321func (pb *Block) startRead() error { 322 pb.mtx.RLock() 323 defer pb.mtx.RUnlock() 324 325 if pb.closing { 326 return ErrClosing 327 } 328 pb.pendingReaders.Add(1) 329 return nil 330} 331 332// Index returns a new IndexReader against the block data. 333func (pb *Block) Index() (IndexReader, error) { 334 if err := pb.startRead(); err != nil { 335 return nil, err 336 } 337 return blockIndexReader{ir: pb.indexr, b: pb}, nil 338} 339 340// Chunks returns a new ChunkReader against the block data. 341func (pb *Block) Chunks() (ChunkReader, error) { 342 if err := pb.startRead(); err != nil { 343 return nil, err 344 } 345 return blockChunkReader{ChunkReader: pb.chunkr, b: pb}, nil 346} 347 348// Tombstones returns a new TombstoneReader against the block data. 349func (pb *Block) Tombstones() (TombstoneReader, error) { 350 if err := pb.startRead(); err != nil { 351 return nil, err 352 } 353 return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil 354} 355 356// GetSymbolTableSize returns the Symbol Table Size in the index of this block. 357func (pb *Block) GetSymbolTableSize() uint64 { 358 return pb.symbolTableSize 359} 360 361func (pb *Block) setCompactionFailed() error { 362 pb.meta.Compaction.Failed = true 363 return writeMetaFile(pb.dir, &pb.meta) 364} 365 366type blockIndexReader struct { 367 ir IndexReader 368 b *Block 369} 370 371func (r blockIndexReader) Symbols() (map[string]struct{}, error) { 372 s, err := r.ir.Symbols() 373 return s, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) 374} 375 376func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, error) { 377 st, err := r.ir.LabelValues(names...) 378 return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) 379} 380 381func (r blockIndexReader) Postings(name, value string) (index.Postings, error) { 382 p, err := r.ir.Postings(name, value) 383 return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) 384} 385 386func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings { 387 return r.ir.SortedPostings(p) 388} 389 390func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { 391 return errors.Wrapf( 392 r.ir.Series(ref, lset, chks), 393 "block: %s", 394 r.b.Meta().ULID, 395 ) 396} 397 398func (r blockIndexReader) LabelIndices() ([][]string, error) { 399 ss, err := r.ir.LabelIndices() 400 return ss, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) 401} 402 403func (r blockIndexReader) LabelNames() ([]string, error) { 404 return r.b.LabelNames() 405} 406 407func (r blockIndexReader) Close() error { 408 r.b.pendingReaders.Done() 409 return nil 410} 411 412type blockTombstoneReader struct { 413 TombstoneReader 414 b *Block 415} 416 417func (r blockTombstoneReader) Close() error { 418 r.b.pendingReaders.Done() 419 return nil 420} 421 422type blockChunkReader struct { 423 ChunkReader 424 b *Block 425} 426 427func (r blockChunkReader) Close() error { 428 r.b.pendingReaders.Done() 429 return nil 430} 431 432// Delete matching series between mint and maxt in the block. 433func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { 434 pb.mtx.Lock() 435 defer pb.mtx.Unlock() 436 437 if pb.closing { 438 return ErrClosing 439 } 440 441 p, err := PostingsForMatchers(pb.indexr, ms...) 442 if err != nil { 443 return errors.Wrap(err, "select series") 444 } 445 446 ir := pb.indexr 447 448 // Choose only valid postings which have chunks in the time-range. 449 stones := newMemTombstones() 450 451 var lset labels.Labels 452 var chks []chunks.Meta 453 454Outer: 455 for p.Next() { 456 err := ir.Series(p.At(), &lset, &chks) 457 if err != nil { 458 return err 459 } 460 461 for _, chk := range chks { 462 if chk.OverlapsClosedInterval(mint, maxt) { 463 // Delete only until the current values and not beyond. 464 tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) 465 stones.addInterval(p.At(), Interval{tmin, tmax}) 466 continue Outer 467 } 468 } 469 } 470 471 if p.Err() != nil { 472 return p.Err() 473 } 474 475 err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { 476 for _, iv := range ivs { 477 stones.addInterval(id, iv) 478 } 479 return nil 480 }) 481 if err != nil { 482 return err 483 } 484 pb.tombstones = stones 485 pb.meta.Stats.NumTombstones = pb.tombstones.Total() 486 487 if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { 488 return err 489 } 490 return writeMetaFile(pb.dir, &pb.meta) 491} 492 493// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). 494// If there was a rewrite, then it returns the ULID of the new block written, else nil. 495func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { 496 numStones := 0 497 498 if err := pb.tombstones.Iter(func(id uint64, ivs Intervals) error { 499 numStones += len(ivs) 500 return nil 501 }); err != nil { 502 // This should never happen, as the iteration function only returns nil. 503 panic(err) 504 } 505 if numStones == 0 { 506 return nil, nil 507 } 508 509 meta := pb.Meta() 510 uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta) 511 if err != nil { 512 return nil, err 513 } 514 return &uid, nil 515} 516 517// Snapshot creates snapshot of the block into dir. 518func (pb *Block) Snapshot(dir string) error { 519 blockDir := filepath.Join(dir, pb.meta.ULID.String()) 520 if err := os.MkdirAll(blockDir, 0777); err != nil { 521 return errors.Wrap(err, "create snapshot block dir") 522 } 523 524 chunksDir := chunkDir(blockDir) 525 if err := os.MkdirAll(chunksDir, 0777); err != nil { 526 return errors.Wrap(err, "create snapshot chunk dir") 527 } 528 529 // Hardlink meta, index and tombstones 530 for _, fname := range []string{ 531 metaFilename, 532 indexFilename, 533 tombstoneFilename, 534 } { 535 if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil { 536 return errors.Wrapf(err, "create snapshot %s", fname) 537 } 538 } 539 540 // Hardlink the chunks 541 curChunkDir := chunkDir(pb.dir) 542 files, err := ioutil.ReadDir(curChunkDir) 543 if err != nil { 544 return errors.Wrap(err, "ReadDir the current chunk dir") 545 } 546 547 for _, f := range files { 548 err := os.Link(filepath.Join(curChunkDir, f.Name()), filepath.Join(chunksDir, f.Name())) 549 if err != nil { 550 return errors.Wrap(err, "hardlink a chunk") 551 } 552 } 553 554 return nil 555} 556 557// OverlapsClosedInterval returns true if the block overlaps [mint, maxt]. 558func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool { 559 // The block itself is a half-open interval 560 // [pb.meta.MinTime, pb.meta.MaxTime). 561 return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime 562} 563 564// LabelNames returns all the unique label names present in the Block in sorted order. 565func (pb *Block) LabelNames() ([]string, error) { 566 return pb.indexr.LabelNames() 567} 568 569func clampInterval(a, b, mint, maxt int64) (int64, int64) { 570 if a < mint { 571 a = mint 572 } 573 if b > maxt { 574 b = maxt 575 } 576 return a, b 577} 578