1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package tsdb 15 16import ( 17 "context" 18 "fmt" 19 "io" 20 "math" 21 "math/rand" 22 "os" 23 "path/filepath" 24 "sort" 25 "time" 26 27 "github.com/go-kit/kit/log" 28 "github.com/go-kit/kit/log/level" 29 "github.com/oklog/ulid" 30 "github.com/pkg/errors" 31 "github.com/prometheus/client_golang/prometheus" 32 "github.com/prometheus/tsdb/chunkenc" 33 "github.com/prometheus/tsdb/chunks" 34 tsdb_errors "github.com/prometheus/tsdb/errors" 35 "github.com/prometheus/tsdb/fileutil" 36 "github.com/prometheus/tsdb/index" 37 "github.com/prometheus/tsdb/labels" 38) 39 40// ExponentialBlockRanges returns the time ranges based on the stepSize. 41func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 { 42 ranges := make([]int64, 0, steps) 43 curRange := minSize 44 for i := 0; i < steps; i++ { 45 ranges = append(ranges, curRange) 46 curRange = curRange * int64(stepSize) 47 } 48 49 return ranges 50} 51 52// Compactor provides compaction against an underlying storage 53// of time series data. 54type Compactor interface { 55 // Plan returns a set of directories that can be compacted concurrently. 56 // The directories can be overlapping. 57 // Results returned when compactions are in progress are undefined. 58 Plan(dir string) ([]string, error) 59 60 // Write persists a Block into a directory. 61 // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. 62 Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) 63 64 // Compact runs compaction against the provided directories. Must 65 // only be called concurrently with results of Plan(). 66 // Can optionally pass a list of already open blocks, 67 // to avoid having to reopen them. 68 // When resulting Block has 0 samples 69 // * No block is written. 70 // * The source dirs are marked Deletable. 71 // * Returns empty ulid.ULID{}. 72 Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) 73} 74 75// LeveledCompactor implements the Compactor interface. 76type LeveledCompactor struct { 77 metrics *compactorMetrics 78 logger log.Logger 79 ranges []int64 80 chunkPool chunkenc.Pool 81 ctx context.Context 82} 83 84type compactorMetrics struct { 85 ran prometheus.Counter 86 populatingBlocks prometheus.Gauge 87 failed prometheus.Counter 88 overlappingBlocks prometheus.Counter 89 duration prometheus.Histogram 90 chunkSize prometheus.Histogram 91 chunkSamples prometheus.Histogram 92 chunkRange prometheus.Histogram 93} 94 95func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { 96 m := &compactorMetrics{} 97 98 m.ran = prometheus.NewCounter(prometheus.CounterOpts{ 99 Name: "prometheus_tsdb_compactions_total", 100 Help: "Total number of compactions that were executed for the partition.", 101 }) 102 m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{ 103 Name: "prometheus_tsdb_compaction_populating_block", 104 Help: "Set to 1 when a block is currently being written to the disk.", 105 }) 106 m.failed = prometheus.NewCounter(prometheus.CounterOpts{ 107 Name: "prometheus_tsdb_compactions_failed_total", 108 Help: "Total number of compactions that failed for the partition.", 109 }) 110 m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{ 111 Name: "prometheus_tsdb_vertical_compactions_total", 112 Help: "Total number of compactions done on overlapping blocks.", 113 }) 114 m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ 115 Name: "prometheus_tsdb_compaction_duration_seconds", 116 Help: "Duration of compaction runs", 117 Buckets: prometheus.ExponentialBuckets(1, 2, 10), 118 }) 119 m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{ 120 Name: "prometheus_tsdb_compaction_chunk_size_bytes", 121 Help: "Final size of chunks on their first compaction", 122 Buckets: prometheus.ExponentialBuckets(32, 1.5, 12), 123 }) 124 m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{ 125 Name: "prometheus_tsdb_compaction_chunk_samples", 126 Help: "Final number of samples on their first compaction", 127 Buckets: prometheus.ExponentialBuckets(4, 1.5, 12), 128 }) 129 m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{ 130 Name: "prometheus_tsdb_compaction_chunk_range_seconds", 131 Help: "Final time range of chunks on their first compaction", 132 Buckets: prometheus.ExponentialBuckets(100, 4, 10), 133 }) 134 135 if r != nil { 136 r.MustRegister( 137 m.ran, 138 m.populatingBlocks, 139 m.failed, 140 m.overlappingBlocks, 141 m.duration, 142 m.chunkRange, 143 m.chunkSamples, 144 m.chunkSize, 145 ) 146 } 147 return m 148} 149 150// NewLeveledCompactor returns a LeveledCompactor. 151func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { 152 if len(ranges) == 0 { 153 return nil, errors.Errorf("at least one range must be provided") 154 } 155 if pool == nil { 156 pool = chunkenc.NewPool() 157 } 158 if l == nil { 159 l = log.NewNopLogger() 160 } 161 return &LeveledCompactor{ 162 ranges: ranges, 163 chunkPool: pool, 164 logger: l, 165 metrics: newCompactorMetrics(r), 166 ctx: ctx, 167 }, nil 168} 169 170type dirMeta struct { 171 dir string 172 meta *BlockMeta 173} 174 175// Plan returns a list of compactable blocks in the provided directory. 176func (c *LeveledCompactor) Plan(dir string) ([]string, error) { 177 dirs, err := blockDirs(dir) 178 if err != nil { 179 return nil, err 180 } 181 if len(dirs) < 1 { 182 return nil, nil 183 } 184 185 var dms []dirMeta 186 for _, dir := range dirs { 187 meta, err := readMetaFile(dir) 188 if err != nil { 189 return nil, err 190 } 191 dms = append(dms, dirMeta{dir, meta}) 192 } 193 return c.plan(dms) 194} 195 196func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { 197 sort.Slice(dms, func(i, j int) bool { 198 return dms[i].meta.MinTime < dms[j].meta.MinTime 199 }) 200 201 res := c.selectOverlappingDirs(dms) 202 if len(res) > 0 { 203 return res, nil 204 } 205 // No overlapping blocks, do compaction the usual way. 206 // We do not include a recently created block with max(minTime), so the block which was just created from WAL. 207 // This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap. 208 dms = dms[:len(dms)-1] 209 210 for _, dm := range c.selectDirs(dms) { 211 res = append(res, dm.dir) 212 } 213 if len(res) > 0 { 214 return res, nil 215 } 216 217 // Compact any blocks with big enough time range that have >5% tombstones. 218 for i := len(dms) - 1; i >= 0; i-- { 219 meta := dms[i].meta 220 if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { 221 break 222 } 223 if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { 224 return []string{dms[i].dir}, nil 225 } 226 } 227 228 return nil, nil 229} 230 231// selectDirs returns the dir metas that should be compacted into a single new block. 232// If only a single block range is configured, the result is always nil. 233func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { 234 if len(c.ranges) < 2 || len(ds) < 1 { 235 return nil 236 } 237 238 highTime := ds[len(ds)-1].meta.MinTime 239 240 for _, iv := range c.ranges[1:] { 241 parts := splitByRange(ds, iv) 242 if len(parts) == 0 { 243 continue 244 } 245 246 Outer: 247 for _, p := range parts { 248 // Do not select the range if it has a block whose compaction failed. 249 for _, dm := range p { 250 if dm.meta.Compaction.Failed { 251 continue Outer 252 } 253 } 254 255 mint := p[0].meta.MinTime 256 maxt := p[len(p)-1].meta.MaxTime 257 // Pick the range of blocks if it spans the full range (potentially with gaps) 258 // or is before the most recent block. 259 // This ensures we don't compact blocks prematurely when another one of the same 260 // size still fits in the range. 261 if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 { 262 return p 263 } 264 } 265 } 266 267 return nil 268} 269 270// selectOverlappingDirs returns all dirs with overlapping time ranges. 271// It expects sorted input by mint and returns the overlapping dirs in the same order as received. 272func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string { 273 if len(ds) < 2 { 274 return nil 275 } 276 var overlappingDirs []string 277 globalMaxt := ds[0].meta.MaxTime 278 for i, d := range ds[1:] { 279 if d.meta.MinTime < globalMaxt { 280 if len(overlappingDirs) == 0 { // When it is the first overlap, need to add the last one as well. 281 overlappingDirs = append(overlappingDirs, ds[i].dir) 282 } 283 overlappingDirs = append(overlappingDirs, d.dir) 284 } else if len(overlappingDirs) > 0 { 285 break 286 } 287 if d.meta.MaxTime > globalMaxt { 288 globalMaxt = d.meta.MaxTime 289 } 290 } 291 return overlappingDirs 292} 293 294// splitByRange splits the directories by the time range. The range sequence starts at 0. 295// 296// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 297// it returns [0-10, 10-20], [50-60], [90-100]. 298func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { 299 var splitDirs [][]dirMeta 300 301 for i := 0; i < len(ds); { 302 var ( 303 group []dirMeta 304 t0 int64 305 m = ds[i].meta 306 ) 307 // Compute start of aligned time range of size tr closest to the current block's start. 308 if m.MinTime >= 0 { 309 t0 = tr * (m.MinTime / tr) 310 } else { 311 t0 = tr * ((m.MinTime - tr + 1) / tr) 312 } 313 // Skip blocks that don't fall into the range. This can happen via mis-alignment or 314 // by being the multiple of the intended range. 315 if m.MaxTime > t0+tr { 316 i++ 317 continue 318 } 319 320 // Add all dirs to the current group that are within [t0, t0+tr]. 321 for ; i < len(ds); i++ { 322 // Either the block falls into the next range or doesn't fit at all (checked above). 323 if ds[i].meta.MaxTime > t0+tr { 324 break 325 } 326 group = append(group, ds[i]) 327 } 328 329 if len(group) > 0 { 330 splitDirs = append(splitDirs, group) 331 } 332 } 333 334 return splitDirs 335} 336 337func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { 338 res := &BlockMeta{ 339 ULID: uid, 340 MinTime: blocks[0].MinTime, 341 } 342 343 sources := map[ulid.ULID]struct{}{} 344 // For overlapping blocks, the Maxt can be 345 // in any block so we track it globally. 346 maxt := int64(math.MinInt64) 347 348 for _, b := range blocks { 349 if b.MaxTime > maxt { 350 maxt = b.MaxTime 351 } 352 if b.Compaction.Level > res.Compaction.Level { 353 res.Compaction.Level = b.Compaction.Level 354 } 355 for _, s := range b.Compaction.Sources { 356 sources[s] = struct{}{} 357 } 358 res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{ 359 ULID: b.ULID, 360 MinTime: b.MinTime, 361 MaxTime: b.MaxTime, 362 }) 363 } 364 res.Compaction.Level++ 365 366 for s := range sources { 367 res.Compaction.Sources = append(res.Compaction.Sources, s) 368 } 369 sort.Slice(res.Compaction.Sources, func(i, j int) bool { 370 return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0 371 }) 372 373 res.MaxTime = maxt 374 return res 375} 376 377// Compact creates a new block in the compactor's directory from the blocks in the 378// provided directories. 379func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { 380 var ( 381 blocks []BlockReader 382 bs []*Block 383 metas []*BlockMeta 384 uids []string 385 ) 386 start := time.Now() 387 388 for _, d := range dirs { 389 meta, err := readMetaFile(d) 390 if err != nil { 391 return uid, err 392 } 393 394 var b *Block 395 396 // Use already open blocks if we can, to avoid 397 // having the index data in memory twice. 398 for _, o := range open { 399 if meta.ULID == o.Meta().ULID { 400 b = o 401 break 402 } 403 } 404 405 if b == nil { 406 var err error 407 b, err = OpenBlock(c.logger, d, c.chunkPool) 408 if err != nil { 409 return uid, err 410 } 411 defer b.Close() 412 } 413 414 metas = append(metas, meta) 415 blocks = append(blocks, b) 416 bs = append(bs, b) 417 uids = append(uids, meta.ULID.String()) 418 } 419 420 entropy := rand.New(rand.NewSource(time.Now().UnixNano())) 421 uid = ulid.MustNew(ulid.Now(), entropy) 422 423 meta := compactBlockMetas(uid, metas...) 424 err = c.write(dest, meta, blocks...) 425 if err == nil { 426 if meta.Stats.NumSamples == 0 { 427 for _, b := range bs { 428 b.meta.Compaction.Deletable = true 429 if err = writeMetaFile(b.dir, &b.meta); err != nil { 430 level.Error(c.logger).Log( 431 "msg", "Failed to write 'Deletable' to meta file after compaction", 432 "ulid", b.meta.ULID, 433 ) 434 } 435 } 436 uid = ulid.ULID{} 437 level.Info(c.logger).Log( 438 "msg", "compact blocks resulted in empty block", 439 "count", len(blocks), 440 "sources", fmt.Sprintf("%v", uids), 441 "duration", time.Since(start), 442 ) 443 } else { 444 level.Info(c.logger).Log( 445 "msg", "compact blocks", 446 "count", len(blocks), 447 "mint", meta.MinTime, 448 "maxt", meta.MaxTime, 449 "ulid", meta.ULID, 450 "sources", fmt.Sprintf("%v", uids), 451 "duration", time.Since(start), 452 ) 453 } 454 return uid, nil 455 } 456 457 var merr tsdb_errors.MultiError 458 merr.Add(err) 459 if err != context.Canceled { 460 for _, b := range bs { 461 if err := b.setCompactionFailed(); err != nil { 462 merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) 463 } 464 } 465 } 466 467 return uid, merr 468} 469 470func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { 471 start := time.Now() 472 473 entropy := rand.New(rand.NewSource(time.Now().UnixNano())) 474 uid := ulid.MustNew(ulid.Now(), entropy) 475 476 meta := &BlockMeta{ 477 ULID: uid, 478 MinTime: mint, 479 MaxTime: maxt, 480 } 481 meta.Compaction.Level = 1 482 meta.Compaction.Sources = []ulid.ULID{uid} 483 484 if parent != nil { 485 meta.Compaction.Parents = []BlockDesc{ 486 {ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime}, 487 } 488 } 489 490 err := c.write(dest, meta, b) 491 if err != nil { 492 return uid, err 493 } 494 495 if meta.Stats.NumSamples == 0 { 496 return ulid.ULID{}, nil 497 } 498 499 level.Info(c.logger).Log( 500 "msg", "write block", 501 "mint", meta.MinTime, 502 "maxt", meta.MaxTime, 503 "ulid", meta.ULID, 504 "duration", time.Since(start), 505 ) 506 return uid, nil 507} 508 509// instrumentedChunkWriter is used for level 1 compactions to record statistics 510// about compacted chunks. 511type instrumentedChunkWriter struct { 512 ChunkWriter 513 514 size prometheus.Histogram 515 samples prometheus.Histogram 516 trange prometheus.Histogram 517} 518 519func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { 520 for _, c := range chunks { 521 w.size.Observe(float64(len(c.Chunk.Bytes()))) 522 w.samples.Observe(float64(c.Chunk.NumSamples())) 523 w.trange.Observe(float64(c.MaxTime - c.MinTime)) 524 } 525 return w.ChunkWriter.WriteChunks(chunks...) 526} 527 528// write creates a new block that is the union of the provided blocks into dir. 529// It cleans up all files of the old blocks after completing successfully. 530func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { 531 dir := filepath.Join(dest, meta.ULID.String()) 532 tmp := dir + ".tmp" 533 var closers []io.Closer 534 defer func(t time.Time) { 535 var merr tsdb_errors.MultiError 536 merr.Add(err) 537 merr.Add(closeAll(closers)) 538 err = merr.Err() 539 540 // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. 541 if err := os.RemoveAll(tmp); err != nil { 542 level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) 543 } 544 if err != nil { 545 c.metrics.failed.Inc() 546 } 547 c.metrics.ran.Inc() 548 c.metrics.duration.Observe(time.Since(t).Seconds()) 549 }(time.Now()) 550 551 if err = os.RemoveAll(tmp); err != nil { 552 return err 553 } 554 555 if err = os.MkdirAll(tmp, 0777); err != nil { 556 return err 557 } 558 559 // Populate chunk and index files into temporary directory with 560 // data of all blocks. 561 var chunkw ChunkWriter 562 563 chunkw, err = chunks.NewWriter(chunkDir(tmp)) 564 if err != nil { 565 return errors.Wrap(err, "open chunk writer") 566 } 567 closers = append(closers, chunkw) 568 // Record written chunk sizes on level 1 compactions. 569 if meta.Compaction.Level == 1 { 570 chunkw = &instrumentedChunkWriter{ 571 ChunkWriter: chunkw, 572 size: c.metrics.chunkSize, 573 samples: c.metrics.chunkSamples, 574 trange: c.metrics.chunkRange, 575 } 576 } 577 578 indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename)) 579 if err != nil { 580 return errors.Wrap(err, "open index writer") 581 } 582 closers = append(closers, indexw) 583 584 if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { 585 return errors.Wrap(err, "write compaction") 586 } 587 588 select { 589 case <-c.ctx.Done(): 590 return c.ctx.Err() 591 default: 592 } 593 594 // We are explicitly closing them here to check for error even 595 // though these are covered under defer. This is because in Windows, 596 // you cannot delete these unless they are closed and the defer is to 597 // make sure they are closed if the function exits due to an error above. 598 var merr tsdb_errors.MultiError 599 for _, w := range closers { 600 merr.Add(w.Close()) 601 } 602 closers = closers[:0] // Avoid closing the writers twice in the defer. 603 if merr.Err() != nil { 604 return merr.Err() 605 } 606 607 // Populated block is empty, so exit early. 608 if meta.Stats.NumSamples == 0 { 609 return nil 610 } 611 612 if err = writeMetaFile(tmp, meta); err != nil { 613 return errors.Wrap(err, "write merged meta") 614 } 615 616 // Create an empty tombstones file. 617 if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil { 618 return errors.Wrap(err, "write new tombstones file") 619 } 620 621 df, err := fileutil.OpenDir(tmp) 622 if err != nil { 623 return errors.Wrap(err, "open temporary block dir") 624 } 625 defer func() { 626 if df != nil { 627 df.Close() 628 } 629 }() 630 631 if err := df.Sync(); err != nil { 632 return errors.Wrap(err, "sync temporary dir file") 633 } 634 635 // Close temp dir before rename block dir (for windows platform). 636 if err = df.Close(); err != nil { 637 return errors.Wrap(err, "close temporary dir") 638 } 639 df = nil 640 641 // Block successfully written, make visible and remove old ones. 642 if err := renameFile(tmp, dir); err != nil { 643 return errors.Wrap(err, "rename block dir") 644 } 645 646 return nil 647} 648 649// populateBlock fills the index and chunk writers with new data gathered as the union 650// of the provided blocks. It returns meta information for the new block. 651// It expects sorted blocks input by mint. 652func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { 653 if len(blocks) == 0 { 654 return errors.New("cannot populate block from no readers") 655 } 656 657 var ( 658 set ChunkSeriesSet 659 allSymbols = make(map[string]struct{}, 1<<16) 660 closers = []io.Closer{} 661 overlapping bool 662 ) 663 defer func() { 664 var merr tsdb_errors.MultiError 665 merr.Add(err) 666 merr.Add(closeAll(closers)) 667 err = merr.Err() 668 c.metrics.populatingBlocks.Set(0) 669 }() 670 c.metrics.populatingBlocks.Set(1) 671 672 globalMaxt := blocks[0].MaxTime() 673 for i, b := range blocks { 674 select { 675 case <-c.ctx.Done(): 676 return c.ctx.Err() 677 default: 678 } 679 680 if !overlapping { 681 if i > 0 && b.MinTime() < globalMaxt { 682 c.metrics.overlappingBlocks.Inc() 683 overlapping = true 684 level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID) 685 } 686 if b.MaxTime() > globalMaxt { 687 globalMaxt = b.MaxTime() 688 } 689 } 690 691 indexr, err := b.Index() 692 if err != nil { 693 return errors.Wrapf(err, "open index reader for block %s", b) 694 } 695 closers = append(closers, indexr) 696 697 chunkr, err := b.Chunks() 698 if err != nil { 699 return errors.Wrapf(err, "open chunk reader for block %s", b) 700 } 701 closers = append(closers, chunkr) 702 703 tombsr, err := b.Tombstones() 704 if err != nil { 705 return errors.Wrapf(err, "open tombstone reader for block %s", b) 706 } 707 closers = append(closers, tombsr) 708 709 symbols, err := indexr.Symbols() 710 if err != nil { 711 return errors.Wrap(err, "read symbols") 712 } 713 for s := range symbols { 714 allSymbols[s] = struct{}{} 715 } 716 717 all, err := indexr.Postings(index.AllPostingsKey()) 718 if err != nil { 719 return err 720 } 721 all = indexr.SortedPostings(all) 722 723 s := newCompactionSeriesSet(indexr, chunkr, tombsr, all) 724 725 if i == 0 { 726 set = s 727 continue 728 } 729 set, err = newCompactionMerger(set, s) 730 if err != nil { 731 return err 732 } 733 } 734 735 // We fully rebuild the postings list index from merged series. 736 var ( 737 postings = index.NewMemPostings() 738 values = map[string]stringset{} 739 i = uint64(0) 740 ) 741 742 if err := indexw.AddSymbols(allSymbols); err != nil { 743 return errors.Wrap(err, "add symbols") 744 } 745 746 for set.Next() { 747 select { 748 case <-c.ctx.Done(): 749 return c.ctx.Err() 750 default: 751 } 752 753 lset, chks, dranges := set.At() // The chunks here are not fully deleted. 754 if overlapping { 755 // If blocks are overlapping, it is possible to have unsorted chunks. 756 sort.Slice(chks, func(i, j int) bool { 757 return chks[i].MinTime < chks[j].MinTime 758 }) 759 } 760 761 // Skip the series with all deleted chunks. 762 if len(chks) == 0 { 763 continue 764 } 765 766 for i, chk := range chks { 767 if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime { 768 return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d", 769 chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime) 770 } 771 772 if len(dranges) > 0 { 773 // Re-encode the chunk to not have deleted values. 774 if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) { 775 continue 776 } 777 newChunk := chunkenc.NewXORChunk() 778 app, err := newChunk.Appender() 779 if err != nil { 780 return err 781 } 782 783 it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges} 784 for it.Next() { 785 ts, v := it.At() 786 app.Append(ts, v) 787 } 788 789 chks[i].Chunk = newChunk 790 } 791 } 792 793 mergedChks := chks 794 if overlapping { 795 mergedChks, err = chunks.MergeOverlappingChunks(chks) 796 if err != nil { 797 return errors.Wrap(err, "merge overlapping chunks") 798 } 799 } 800 if err := chunkw.WriteChunks(mergedChks...); err != nil { 801 return errors.Wrap(err, "write chunks") 802 } 803 804 if err := indexw.AddSeries(i, lset, mergedChks...); err != nil { 805 return errors.Wrap(err, "add series") 806 } 807 808 meta.Stats.NumChunks += uint64(len(mergedChks)) 809 meta.Stats.NumSeries++ 810 for _, chk := range mergedChks { 811 meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) 812 } 813 814 for _, chk := range mergedChks { 815 if err := c.chunkPool.Put(chk.Chunk); err != nil { 816 return errors.Wrap(err, "put chunk") 817 } 818 } 819 820 for _, l := range lset { 821 valset, ok := values[l.Name] 822 if !ok { 823 valset = stringset{} 824 values[l.Name] = valset 825 } 826 valset.set(l.Value) 827 } 828 postings.Add(i, lset) 829 830 i++ 831 } 832 if set.Err() != nil { 833 return errors.Wrap(set.Err(), "iterate compaction set") 834 } 835 836 s := make([]string, 0, 256) 837 for n, v := range values { 838 s = s[:0] 839 840 for x := range v { 841 s = append(s, x) 842 } 843 if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { 844 return errors.Wrap(err, "write label index") 845 } 846 } 847 848 for _, l := range postings.SortedKeys() { 849 if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil { 850 return errors.Wrap(err, "write postings") 851 } 852 } 853 return nil 854} 855 856type compactionSeriesSet struct { 857 p index.Postings 858 index IndexReader 859 chunks ChunkReader 860 tombstones TombstoneReader 861 862 l labels.Labels 863 c []chunks.Meta 864 intervals Intervals 865 err error 866} 867 868func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p index.Postings) *compactionSeriesSet { 869 return &compactionSeriesSet{ 870 index: i, 871 chunks: c, 872 tombstones: t, 873 p: p, 874 } 875} 876 877func (c *compactionSeriesSet) Next() bool { 878 if !c.p.Next() { 879 return false 880 } 881 var err error 882 883 c.intervals, err = c.tombstones.Get(c.p.At()) 884 if err != nil { 885 c.err = errors.Wrap(err, "get tombstones") 886 return false 887 } 888 889 if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil { 890 c.err = errors.Wrapf(err, "get series %d", c.p.At()) 891 return false 892 } 893 894 // Remove completely deleted chunks. 895 if len(c.intervals) > 0 { 896 chks := make([]chunks.Meta, 0, len(c.c)) 897 for _, chk := range c.c { 898 if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) { 899 chks = append(chks, chk) 900 } 901 } 902 903 c.c = chks 904 } 905 906 for i := range c.c { 907 chk := &c.c[i] 908 909 chk.Chunk, err = c.chunks.Chunk(chk.Ref) 910 if err != nil { 911 c.err = errors.Wrapf(err, "chunk %d not found", chk.Ref) 912 return false 913 } 914 } 915 916 return true 917} 918 919func (c *compactionSeriesSet) Err() error { 920 if c.err != nil { 921 return c.err 922 } 923 return c.p.Err() 924} 925 926func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) { 927 return c.l, c.c, c.intervals 928} 929 930type compactionMerger struct { 931 a, b ChunkSeriesSet 932 933 aok, bok bool 934 l labels.Labels 935 c []chunks.Meta 936 intervals Intervals 937} 938 939func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { 940 c := &compactionMerger{ 941 a: a, 942 b: b, 943 } 944 // Initialize first elements of both sets as Next() needs 945 // one element look-ahead. 946 c.aok = c.a.Next() 947 c.bok = c.b.Next() 948 949 return c, c.Err() 950} 951 952func (c *compactionMerger) compare() int { 953 if !c.aok { 954 return 1 955 } 956 if !c.bok { 957 return -1 958 } 959 a, _, _ := c.a.At() 960 b, _, _ := c.b.At() 961 return labels.Compare(a, b) 962} 963 964func (c *compactionMerger) Next() bool { 965 if !c.aok && !c.bok || c.Err() != nil { 966 return false 967 } 968 // While advancing child iterators the memory used for labels and chunks 969 // may be reused. When picking a series we have to store the result. 970 var lset labels.Labels 971 var chks []chunks.Meta 972 973 d := c.compare() 974 if d > 0 { 975 lset, chks, c.intervals = c.b.At() 976 c.l = append(c.l[:0], lset...) 977 c.c = append(c.c[:0], chks...) 978 979 c.bok = c.b.Next() 980 } else if d < 0 { 981 lset, chks, c.intervals = c.a.At() 982 c.l = append(c.l[:0], lset...) 983 c.c = append(c.c[:0], chks...) 984 985 c.aok = c.a.Next() 986 } else { 987 // Both sets contain the current series. Chain them into a single one. 988 l, ca, ra := c.a.At() 989 _, cb, rb := c.b.At() 990 991 for _, r := range rb { 992 ra = ra.add(r) 993 } 994 995 c.l = append(c.l[:0], l...) 996 c.c = append(append(c.c[:0], ca...), cb...) 997 c.intervals = ra 998 999 c.aok = c.a.Next() 1000 c.bok = c.b.Next() 1001 } 1002 1003 return true 1004} 1005 1006func (c *compactionMerger) Err() error { 1007 if c.a.Err() != nil { 1008 return c.a.Err() 1009 } 1010 return c.b.Err() 1011} 1012 1013func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) { 1014 return c.l, c.c, c.intervals 1015} 1016 1017func renameFile(from, to string) error { 1018 if err := os.RemoveAll(to); err != nil { 1019 return err 1020 } 1021 if err := os.Rename(from, to); err != nil { 1022 return err 1023 } 1024 1025 // Directory was renamed; sync parent dir to persist rename. 1026 pdir, err := fileutil.OpenDir(filepath.Dir(to)) 1027 if err != nil { 1028 return err 1029 } 1030 1031 if err = pdir.Sync(); err != nil { 1032 pdir.Close() 1033 return err 1034 } 1035 return pdir.Close() 1036} 1037