1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package tsdb 15 16import ( 17 "context" 18 "fmt" 19 "io" 20 "math" 21 "math/rand" 22 "os" 23 "path/filepath" 24 "sort" 25 "time" 26 27 "github.com/go-kit/kit/log" 28 "github.com/go-kit/kit/log/level" 29 "github.com/oklog/ulid" 30 "github.com/pkg/errors" 31 "github.com/prometheus/client_golang/prometheus" 32 "github.com/prometheus/prometheus/pkg/labels" 33 "github.com/prometheus/prometheus/tsdb/chunkenc" 34 "github.com/prometheus/prometheus/tsdb/chunks" 35 tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" 36 "github.com/prometheus/prometheus/tsdb/fileutil" 37 "github.com/prometheus/prometheus/tsdb/index" 38 "github.com/prometheus/prometheus/tsdb/tombstones" 39) 40 41// ExponentialBlockRanges returns the time ranges based on the stepSize. 42func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 { 43 ranges := make([]int64, 0, steps) 44 curRange := minSize 45 for i := 0; i < steps; i++ { 46 ranges = append(ranges, curRange) 47 curRange = curRange * int64(stepSize) 48 } 49 50 return ranges 51} 52 53// Compactor provides compaction against an underlying storage 54// of time series data. 55type Compactor interface { 56 // Plan returns a set of directories that can be compacted concurrently. 57 // The directories can be overlapping. 58 // Results returned when compactions are in progress are undefined. 59 Plan(dir string) ([]string, error) 60 61 // Write persists a Block into a directory. 62 // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. 63 Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) 64 65 // Compact runs compaction against the provided directories. Must 66 // only be called concurrently with results of Plan(). 67 // Can optionally pass a list of already open blocks, 68 // to avoid having to reopen them. 69 // When resulting Block has 0 samples 70 // * No block is written. 71 // * The source dirs are marked Deletable. 72 // * Returns empty ulid.ULID{}. 73 Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) 74} 75 76// LeveledCompactor implements the Compactor interface. 77type LeveledCompactor struct { 78 metrics *compactorMetrics 79 logger log.Logger 80 ranges []int64 81 chunkPool chunkenc.Pool 82 ctx context.Context 83} 84 85type compactorMetrics struct { 86 ran prometheus.Counter 87 populatingBlocks prometheus.Gauge 88 overlappingBlocks prometheus.Counter 89 duration prometheus.Histogram 90 chunkSize prometheus.Histogram 91 chunkSamples prometheus.Histogram 92 chunkRange prometheus.Histogram 93} 94 95func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { 96 m := &compactorMetrics{} 97 98 m.ran = prometheus.NewCounter(prometheus.CounterOpts{ 99 Name: "prometheus_tsdb_compactions_total", 100 Help: "Total number of compactions that were executed for the partition.", 101 }) 102 m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{ 103 Name: "prometheus_tsdb_compaction_populating_block", 104 Help: "Set to 1 when a block is currently being written to the disk.", 105 }) 106 m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{ 107 Name: "prometheus_tsdb_vertical_compactions_total", 108 Help: "Total number of compactions done on overlapping blocks.", 109 }) 110 m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ 111 Name: "prometheus_tsdb_compaction_duration_seconds", 112 Help: "Duration of compaction runs", 113 Buckets: prometheus.ExponentialBuckets(1, 2, 10), 114 }) 115 m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{ 116 Name: "prometheus_tsdb_compaction_chunk_size_bytes", 117 Help: "Final size of chunks on their first compaction", 118 Buckets: prometheus.ExponentialBuckets(32, 1.5, 12), 119 }) 120 m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{ 121 Name: "prometheus_tsdb_compaction_chunk_samples", 122 Help: "Final number of samples on their first compaction", 123 Buckets: prometheus.ExponentialBuckets(4, 1.5, 12), 124 }) 125 m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{ 126 Name: "prometheus_tsdb_compaction_chunk_range_seconds", 127 Help: "Final time range of chunks on their first compaction", 128 Buckets: prometheus.ExponentialBuckets(100, 4, 10), 129 }) 130 131 if r != nil { 132 r.MustRegister( 133 m.ran, 134 m.populatingBlocks, 135 m.overlappingBlocks, 136 m.duration, 137 m.chunkRange, 138 m.chunkSamples, 139 m.chunkSize, 140 ) 141 } 142 return m 143} 144 145// NewLeveledCompactor returns a LeveledCompactor. 146func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { 147 if len(ranges) == 0 { 148 return nil, errors.Errorf("at least one range must be provided") 149 } 150 if pool == nil { 151 pool = chunkenc.NewPool() 152 } 153 if l == nil { 154 l = log.NewNopLogger() 155 } 156 return &LeveledCompactor{ 157 ranges: ranges, 158 chunkPool: pool, 159 logger: l, 160 metrics: newCompactorMetrics(r), 161 ctx: ctx, 162 }, nil 163} 164 165type dirMeta struct { 166 dir string 167 meta *BlockMeta 168} 169 170// Plan returns a list of compactable blocks in the provided directory. 171func (c *LeveledCompactor) Plan(dir string) ([]string, error) { 172 dirs, err := blockDirs(dir) 173 if err != nil { 174 return nil, err 175 } 176 if len(dirs) < 1 { 177 return nil, nil 178 } 179 180 var dms []dirMeta 181 for _, dir := range dirs { 182 meta, _, err := readMetaFile(dir) 183 if err != nil { 184 return nil, err 185 } 186 dms = append(dms, dirMeta{dir, meta}) 187 } 188 return c.plan(dms) 189} 190 191func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { 192 sort.Slice(dms, func(i, j int) bool { 193 return dms[i].meta.MinTime < dms[j].meta.MinTime 194 }) 195 196 res := c.selectOverlappingDirs(dms) 197 if len(res) > 0 { 198 return res, nil 199 } 200 // No overlapping blocks, do compaction the usual way. 201 // We do not include a recently created block with max(minTime), so the block which was just created from WAL. 202 // This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap. 203 dms = dms[:len(dms)-1] 204 205 for _, dm := range c.selectDirs(dms) { 206 res = append(res, dm.dir) 207 } 208 if len(res) > 0 { 209 return res, nil 210 } 211 212 // Compact any blocks with big enough time range that have >5% tombstones. 213 for i := len(dms) - 1; i >= 0; i-- { 214 meta := dms[i].meta 215 if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { 216 break 217 } 218 if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { 219 return []string{dms[i].dir}, nil 220 } 221 } 222 223 return nil, nil 224} 225 226// selectDirs returns the dir metas that should be compacted into a single new block. 227// If only a single block range is configured, the result is always nil. 228func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { 229 if len(c.ranges) < 2 || len(ds) < 1 { 230 return nil 231 } 232 233 highTime := ds[len(ds)-1].meta.MinTime 234 235 for _, iv := range c.ranges[1:] { 236 parts := splitByRange(ds, iv) 237 if len(parts) == 0 { 238 continue 239 } 240 241 Outer: 242 for _, p := range parts { 243 // Do not select the range if it has a block whose compaction failed. 244 for _, dm := range p { 245 if dm.meta.Compaction.Failed { 246 continue Outer 247 } 248 } 249 250 mint := p[0].meta.MinTime 251 maxt := p[len(p)-1].meta.MaxTime 252 // Pick the range of blocks if it spans the full range (potentially with gaps) 253 // or is before the most recent block. 254 // This ensures we don't compact blocks prematurely when another one of the same 255 // size still fits in the range. 256 if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 { 257 return p 258 } 259 } 260 } 261 262 return nil 263} 264 265// selectOverlappingDirs returns all dirs with overlapping time ranges. 266// It expects sorted input by mint and returns the overlapping dirs in the same order as received. 267func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string { 268 if len(ds) < 2 { 269 return nil 270 } 271 var overlappingDirs []string 272 globalMaxt := ds[0].meta.MaxTime 273 for i, d := range ds[1:] { 274 if d.meta.MinTime < globalMaxt { 275 if len(overlappingDirs) == 0 { // When it is the first overlap, need to add the last one as well. 276 overlappingDirs = append(overlappingDirs, ds[i].dir) 277 } 278 overlappingDirs = append(overlappingDirs, d.dir) 279 } else if len(overlappingDirs) > 0 { 280 break 281 } 282 if d.meta.MaxTime > globalMaxt { 283 globalMaxt = d.meta.MaxTime 284 } 285 } 286 return overlappingDirs 287} 288 289// splitByRange splits the directories by the time range. The range sequence starts at 0. 290// 291// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 292// it returns [0-10, 10-20], [50-60], [90-100]. 293func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { 294 var splitDirs [][]dirMeta 295 296 for i := 0; i < len(ds); { 297 var ( 298 group []dirMeta 299 t0 int64 300 m = ds[i].meta 301 ) 302 // Compute start of aligned time range of size tr closest to the current block's start. 303 if m.MinTime >= 0 { 304 t0 = tr * (m.MinTime / tr) 305 } else { 306 t0 = tr * ((m.MinTime - tr + 1) / tr) 307 } 308 // Skip blocks that don't fall into the range. This can happen via mis-alignment or 309 // by being the multiple of the intended range. 310 if m.MaxTime > t0+tr { 311 i++ 312 continue 313 } 314 315 // Add all dirs to the current group that are within [t0, t0+tr]. 316 for ; i < len(ds); i++ { 317 // Either the block falls into the next range or doesn't fit at all (checked above). 318 if ds[i].meta.MaxTime > t0+tr { 319 break 320 } 321 group = append(group, ds[i]) 322 } 323 324 if len(group) > 0 { 325 splitDirs = append(splitDirs, group) 326 } 327 } 328 329 return splitDirs 330} 331 332func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { 333 res := &BlockMeta{ 334 ULID: uid, 335 MinTime: blocks[0].MinTime, 336 } 337 338 sources := map[ulid.ULID]struct{}{} 339 // For overlapping blocks, the Maxt can be 340 // in any block so we track it globally. 341 maxt := int64(math.MinInt64) 342 343 for _, b := range blocks { 344 if b.MaxTime > maxt { 345 maxt = b.MaxTime 346 } 347 if b.Compaction.Level > res.Compaction.Level { 348 res.Compaction.Level = b.Compaction.Level 349 } 350 for _, s := range b.Compaction.Sources { 351 sources[s] = struct{}{} 352 } 353 res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{ 354 ULID: b.ULID, 355 MinTime: b.MinTime, 356 MaxTime: b.MaxTime, 357 }) 358 } 359 res.Compaction.Level++ 360 361 for s := range sources { 362 res.Compaction.Sources = append(res.Compaction.Sources, s) 363 } 364 sort.Slice(res.Compaction.Sources, func(i, j int) bool { 365 return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0 366 }) 367 368 res.MaxTime = maxt 369 return res 370} 371 372// Compact creates a new block in the compactor's directory from the blocks in the 373// provided directories. 374func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { 375 var ( 376 blocks []BlockReader 377 bs []*Block 378 metas []*BlockMeta 379 uids []string 380 ) 381 start := time.Now() 382 383 for _, d := range dirs { 384 meta, _, err := readMetaFile(d) 385 if err != nil { 386 return uid, err 387 } 388 389 var b *Block 390 391 // Use already open blocks if we can, to avoid 392 // having the index data in memory twice. 393 for _, o := range open { 394 if meta.ULID == o.Meta().ULID { 395 b = o 396 break 397 } 398 } 399 400 if b == nil { 401 var err error 402 b, err = OpenBlock(c.logger, d, c.chunkPool) 403 if err != nil { 404 return uid, err 405 } 406 defer b.Close() 407 } 408 409 metas = append(metas, meta) 410 blocks = append(blocks, b) 411 bs = append(bs, b) 412 uids = append(uids, meta.ULID.String()) 413 } 414 415 entropy := rand.New(rand.NewSource(time.Now().UnixNano())) 416 uid = ulid.MustNew(ulid.Now(), entropy) 417 418 meta := compactBlockMetas(uid, metas...) 419 err = c.write(dest, meta, blocks...) 420 if err == nil { 421 if meta.Stats.NumSamples == 0 { 422 for _, b := range bs { 423 b.meta.Compaction.Deletable = true 424 n, err := writeMetaFile(c.logger, b.dir, &b.meta) 425 if err != nil { 426 level.Error(c.logger).Log( 427 "msg", "Failed to write 'Deletable' to meta file after compaction", 428 "ulid", b.meta.ULID, 429 ) 430 } 431 b.numBytesMeta = n 432 } 433 uid = ulid.ULID{} 434 level.Info(c.logger).Log( 435 "msg", "compact blocks resulted in empty block", 436 "count", len(blocks), 437 "sources", fmt.Sprintf("%v", uids), 438 "duration", time.Since(start), 439 ) 440 } else { 441 level.Info(c.logger).Log( 442 "msg", "compact blocks", 443 "count", len(blocks), 444 "mint", meta.MinTime, 445 "maxt", meta.MaxTime, 446 "ulid", meta.ULID, 447 "sources", fmt.Sprintf("%v", uids), 448 "duration", time.Since(start), 449 ) 450 } 451 return uid, nil 452 } 453 454 var merr tsdb_errors.MultiError 455 merr.Add(err) 456 if err != context.Canceled { 457 for _, b := range bs { 458 if err := b.setCompactionFailed(); err != nil { 459 merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) 460 } 461 } 462 } 463 464 return uid, merr 465} 466 467func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { 468 start := time.Now() 469 470 entropy := rand.New(rand.NewSource(time.Now().UnixNano())) 471 uid := ulid.MustNew(ulid.Now(), entropy) 472 473 meta := &BlockMeta{ 474 ULID: uid, 475 MinTime: mint, 476 MaxTime: maxt, 477 } 478 meta.Compaction.Level = 1 479 meta.Compaction.Sources = []ulid.ULID{uid} 480 481 if parent != nil { 482 meta.Compaction.Parents = []BlockDesc{ 483 {ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime}, 484 } 485 } 486 487 err := c.write(dest, meta, b) 488 if err != nil { 489 return uid, err 490 } 491 492 if meta.Stats.NumSamples == 0 { 493 return ulid.ULID{}, nil 494 } 495 496 level.Info(c.logger).Log( 497 "msg", "write block", 498 "mint", meta.MinTime, 499 "maxt", meta.MaxTime, 500 "ulid", meta.ULID, 501 "duration", time.Since(start), 502 ) 503 return uid, nil 504} 505 506// instrumentedChunkWriter is used for level 1 compactions to record statistics 507// about compacted chunks. 508type instrumentedChunkWriter struct { 509 ChunkWriter 510 511 size prometheus.Histogram 512 samples prometheus.Histogram 513 trange prometheus.Histogram 514} 515 516func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { 517 for _, c := range chunks { 518 w.size.Observe(float64(len(c.Chunk.Bytes()))) 519 w.samples.Observe(float64(c.Chunk.NumSamples())) 520 w.trange.Observe(float64(c.MaxTime - c.MinTime)) 521 } 522 return w.ChunkWriter.WriteChunks(chunks...) 523} 524 525// write creates a new block that is the union of the provided blocks into dir. 526// It cleans up all files of the old blocks after completing successfully. 527func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { 528 dir := filepath.Join(dest, meta.ULID.String()) 529 tmp := dir + ".tmp" 530 var closers []io.Closer 531 defer func(t time.Time) { 532 var merr tsdb_errors.MultiError 533 merr.Add(err) 534 merr.Add(closeAll(closers)) 535 err = merr.Err() 536 537 // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. 538 if err := os.RemoveAll(tmp); err != nil { 539 level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) 540 } 541 c.metrics.ran.Inc() 542 c.metrics.duration.Observe(time.Since(t).Seconds()) 543 }(time.Now()) 544 545 if err = os.RemoveAll(tmp); err != nil { 546 return err 547 } 548 549 if err = os.MkdirAll(tmp, 0777); err != nil { 550 return err 551 } 552 553 // Populate chunk and index files into temporary directory with 554 // data of all blocks. 555 var chunkw ChunkWriter 556 557 chunkw, err = chunks.NewWriter(chunkDir(tmp)) 558 if err != nil { 559 return errors.Wrap(err, "open chunk writer") 560 } 561 closers = append(closers, chunkw) 562 // Record written chunk sizes on level 1 compactions. 563 if meta.Compaction.Level == 1 { 564 chunkw = &instrumentedChunkWriter{ 565 ChunkWriter: chunkw, 566 size: c.metrics.chunkSize, 567 samples: c.metrics.chunkSamples, 568 trange: c.metrics.chunkRange, 569 } 570 } 571 572 indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) 573 if err != nil { 574 return errors.Wrap(err, "open index writer") 575 } 576 closers = append(closers, indexw) 577 578 if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { 579 return errors.Wrap(err, "write compaction") 580 } 581 582 select { 583 case <-c.ctx.Done(): 584 return c.ctx.Err() 585 default: 586 } 587 588 // We are explicitly closing them here to check for error even 589 // though these are covered under defer. This is because in Windows, 590 // you cannot delete these unless they are closed and the defer is to 591 // make sure they are closed if the function exits due to an error above. 592 var merr tsdb_errors.MultiError 593 for _, w := range closers { 594 merr.Add(w.Close()) 595 } 596 closers = closers[:0] // Avoid closing the writers twice in the defer. 597 if merr.Err() != nil { 598 return merr.Err() 599 } 600 601 // Populated block is empty, so exit early. 602 if meta.Stats.NumSamples == 0 { 603 return nil 604 } 605 606 if _, err = writeMetaFile(c.logger, tmp, meta); err != nil { 607 return errors.Wrap(err, "write merged meta") 608 } 609 610 // Create an empty tombstones file. 611 if _, err := tombstones.WriteFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil { 612 return errors.Wrap(err, "write new tombstones file") 613 } 614 615 df, err := fileutil.OpenDir(tmp) 616 if err != nil { 617 return errors.Wrap(err, "open temporary block dir") 618 } 619 defer func() { 620 if df != nil { 621 df.Close() 622 } 623 }() 624 625 if err := df.Sync(); err != nil { 626 return errors.Wrap(err, "sync temporary dir file") 627 } 628 629 // Close temp dir before rename block dir (for windows platform). 630 if err = df.Close(); err != nil { 631 return errors.Wrap(err, "close temporary dir") 632 } 633 df = nil 634 635 // Block successfully written, make visible and remove old ones. 636 if err := fileutil.Replace(tmp, dir); err != nil { 637 return errors.Wrap(err, "rename block dir") 638 } 639 640 return nil 641} 642 643// populateBlock fills the index and chunk writers with new data gathered as the union 644// of the provided blocks. It returns meta information for the new block. 645// It expects sorted blocks input by mint. 646func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { 647 if len(blocks) == 0 { 648 return errors.New("cannot populate block from no readers") 649 } 650 651 var ( 652 set ChunkSeriesSet 653 symbols index.StringIter 654 closers = []io.Closer{} 655 overlapping bool 656 ) 657 defer func() { 658 var merr tsdb_errors.MultiError 659 merr.Add(err) 660 merr.Add(closeAll(closers)) 661 err = merr.Err() 662 c.metrics.populatingBlocks.Set(0) 663 }() 664 c.metrics.populatingBlocks.Set(1) 665 666 globalMaxt := blocks[0].Meta().MaxTime 667 for i, b := range blocks { 668 select { 669 case <-c.ctx.Done(): 670 return c.ctx.Err() 671 default: 672 } 673 674 if !overlapping { 675 if i > 0 && b.Meta().MinTime < globalMaxt { 676 c.metrics.overlappingBlocks.Inc() 677 overlapping = true 678 level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID) 679 } 680 if b.Meta().MaxTime > globalMaxt { 681 globalMaxt = b.Meta().MaxTime 682 } 683 } 684 685 indexr, err := b.Index() 686 if err != nil { 687 return errors.Wrapf(err, "open index reader for block %s", b) 688 } 689 closers = append(closers, indexr) 690 691 chunkr, err := b.Chunks() 692 if err != nil { 693 return errors.Wrapf(err, "open chunk reader for block %s", b) 694 } 695 closers = append(closers, chunkr) 696 697 tombsr, err := b.Tombstones() 698 if err != nil { 699 return errors.Wrapf(err, "open tombstone reader for block %s", b) 700 } 701 closers = append(closers, tombsr) 702 703 k, v := index.AllPostingsKey() 704 all, err := indexr.Postings(k, v) 705 if err != nil { 706 return err 707 } 708 all = indexr.SortedPostings(all) 709 710 s := newCompactionSeriesSet(indexr, chunkr, tombsr, all) 711 syms := indexr.Symbols() 712 713 if i == 0 { 714 set = s 715 symbols = syms 716 continue 717 } 718 set, err = newCompactionMerger(set, s) 719 if err != nil { 720 return err 721 } 722 symbols = newMergedStringIter(symbols, syms) 723 } 724 725 for symbols.Next() { 726 if err := indexw.AddSymbol(symbols.At()); err != nil { 727 return errors.Wrap(err, "add symbol") 728 } 729 } 730 if symbols.Err() != nil { 731 return errors.Wrap(symbols.Err(), "next symbol") 732 } 733 734 delIter := &deletedIterator{} 735 ref := uint64(0) 736 for set.Next() { 737 select { 738 case <-c.ctx.Done(): 739 return c.ctx.Err() 740 default: 741 } 742 743 lset, chks, dranges := set.At() // The chunks here are not fully deleted. 744 if overlapping { 745 // If blocks are overlapping, it is possible to have unsorted chunks. 746 sort.Slice(chks, func(i, j int) bool { 747 return chks[i].MinTime < chks[j].MinTime 748 }) 749 } 750 751 // Skip the series with all deleted chunks. 752 if len(chks) == 0 { 753 continue 754 } 755 756 for i, chk := range chks { 757 // Re-encode head chunks that are still open (being appended to) or 758 // outside the compacted MaxTime range. 759 // The chunk.Bytes() method is not safe for open chunks hence the re-encoding. 760 // This happens when snapshotting the head block. 761 // 762 // Block time range is half-open: [meta.MinTime, meta.MaxTime) and 763 // chunks are closed hence the chk.MaxTime >= meta.MaxTime check. 764 // 765 // TODO think how to avoid the typecasting to verify when it is head block. 766 if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime { 767 dranges = append(dranges, tombstones.Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64}) 768 769 } else 770 // Sanity check for disk blocks. 771 // chk.MaxTime == meta.MaxTime shouldn't happen as well, but will brake many users so not checking for that. 772 if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime { 773 return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d", 774 chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime) 775 } 776 777 if len(dranges) > 0 { 778 // Re-encode the chunk to not have deleted values. 779 if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) { 780 continue 781 } 782 newChunk := chunkenc.NewXORChunk() 783 app, err := newChunk.Appender() 784 if err != nil { 785 return err 786 } 787 788 delIter.it = chk.Chunk.Iterator(delIter.it) 789 delIter.intervals = dranges 790 791 var ( 792 t int64 793 v float64 794 ) 795 for delIter.Next() { 796 t, v = delIter.At() 797 app.Append(t, v) 798 } 799 if err := delIter.Err(); err != nil { 800 return errors.Wrap(err, "iterate chunk while re-encoding") 801 } 802 803 chks[i].Chunk = newChunk 804 chks[i].MaxTime = t 805 } 806 } 807 808 mergedChks := chks 809 if overlapping { 810 mergedChks, err = chunks.MergeOverlappingChunks(chks) 811 if err != nil { 812 return errors.Wrap(err, "merge overlapping chunks") 813 } 814 } 815 if err := chunkw.WriteChunks(mergedChks...); err != nil { 816 return errors.Wrap(err, "write chunks") 817 } 818 819 if err := indexw.AddSeries(ref, lset, mergedChks...); err != nil { 820 return errors.Wrap(err, "add series") 821 } 822 823 meta.Stats.NumChunks += uint64(len(mergedChks)) 824 meta.Stats.NumSeries++ 825 for _, chk := range mergedChks { 826 meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) 827 } 828 829 for _, chk := range mergedChks { 830 if err := c.chunkPool.Put(chk.Chunk); err != nil { 831 return errors.Wrap(err, "put chunk") 832 } 833 } 834 835 ref++ 836 } 837 if set.Err() != nil { 838 return errors.Wrap(set.Err(), "iterate compaction set") 839 } 840 841 return nil 842} 843 844type compactionSeriesSet struct { 845 p index.Postings 846 index IndexReader 847 chunks ChunkReader 848 tombstones tombstones.Reader 849 850 l labels.Labels 851 c []chunks.Meta 852 intervals tombstones.Intervals 853 err error 854} 855 856func newCompactionSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings) *compactionSeriesSet { 857 return &compactionSeriesSet{ 858 index: i, 859 chunks: c, 860 tombstones: t, 861 p: p, 862 } 863} 864 865func (c *compactionSeriesSet) Next() bool { 866 if !c.p.Next() { 867 return false 868 } 869 var err error 870 871 c.intervals, err = c.tombstones.Get(c.p.At()) 872 if err != nil { 873 c.err = errors.Wrap(err, "get tombstones") 874 return false 875 } 876 877 if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil { 878 c.err = errors.Wrapf(err, "get series %d", c.p.At()) 879 return false 880 } 881 882 // Remove completely deleted chunks. 883 if len(c.intervals) > 0 { 884 chks := make([]chunks.Meta, 0, len(c.c)) 885 for _, chk := range c.c { 886 if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(c.intervals)) { 887 chks = append(chks, chk) 888 } 889 } 890 891 c.c = chks 892 } 893 894 for i := range c.c { 895 chk := &c.c[i] 896 897 chk.Chunk, err = c.chunks.Chunk(chk.Ref) 898 if err != nil { 899 c.err = errors.Wrapf(err, "chunk %d not found", chk.Ref) 900 return false 901 } 902 } 903 904 return true 905} 906 907func (c *compactionSeriesSet) Err() error { 908 if c.err != nil { 909 return c.err 910 } 911 return c.p.Err() 912} 913 914func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { 915 return c.l, c.c, c.intervals 916} 917 918type compactionMerger struct { 919 a, b ChunkSeriesSet 920 921 aok, bok bool 922 l labels.Labels 923 c []chunks.Meta 924 intervals tombstones.Intervals 925} 926 927func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { 928 c := &compactionMerger{ 929 a: a, 930 b: b, 931 } 932 // Initialize first elements of both sets as Next() needs 933 // one element look-ahead. 934 c.aok = c.a.Next() 935 c.bok = c.b.Next() 936 937 return c, c.Err() 938} 939 940func (c *compactionMerger) compare() int { 941 if !c.aok { 942 return 1 943 } 944 if !c.bok { 945 return -1 946 } 947 a, _, _ := c.a.At() 948 b, _, _ := c.b.At() 949 return labels.Compare(a, b) 950} 951 952func (c *compactionMerger) Next() bool { 953 if !c.aok && !c.bok || c.Err() != nil { 954 return false 955 } 956 // While advancing child iterators the memory used for labels and chunks 957 // may be reused. When picking a series we have to store the result. 958 var lset labels.Labels 959 var chks []chunks.Meta 960 961 d := c.compare() 962 if d > 0 { 963 lset, chks, c.intervals = c.b.At() 964 c.l = append(c.l[:0], lset...) 965 c.c = append(c.c[:0], chks...) 966 967 c.bok = c.b.Next() 968 } else if d < 0 { 969 lset, chks, c.intervals = c.a.At() 970 c.l = append(c.l[:0], lset...) 971 c.c = append(c.c[:0], chks...) 972 973 c.aok = c.a.Next() 974 } else { 975 // Both sets contain the current series. Chain them into a single one. 976 l, ca, ra := c.a.At() 977 _, cb, rb := c.b.At() 978 979 for _, r := range rb { 980 ra = ra.Add(r) 981 } 982 983 c.l = append(c.l[:0], l...) 984 c.c = append(append(c.c[:0], ca...), cb...) 985 c.intervals = ra 986 987 c.aok = c.a.Next() 988 c.bok = c.b.Next() 989 } 990 991 return true 992} 993 994func (c *compactionMerger) Err() error { 995 if c.a.Err() != nil { 996 return c.a.Err() 997 } 998 return c.b.Err() 999} 1000 1001func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { 1002 return c.l, c.c, c.intervals 1003} 1004 1005func newMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter { 1006 return &mergedStringIter{a: a, b: b, aok: a.Next(), bok: b.Next()} 1007} 1008 1009type mergedStringIter struct { 1010 a index.StringIter 1011 b index.StringIter 1012 aok, bok bool 1013 cur string 1014} 1015 1016func (m *mergedStringIter) Next() bool { 1017 if (!m.aok && !m.bok) || (m.Err() != nil) { 1018 return false 1019 } 1020 1021 if !m.aok { 1022 m.cur = m.b.At() 1023 m.bok = m.b.Next() 1024 } else if !m.bok { 1025 m.cur = m.a.At() 1026 m.aok = m.a.Next() 1027 } else if m.b.At() > m.a.At() { 1028 m.cur = m.a.At() 1029 m.aok = m.a.Next() 1030 } else if m.a.At() > m.b.At() { 1031 m.cur = m.b.At() 1032 m.bok = m.b.Next() 1033 } else { // Equal. 1034 m.cur = m.b.At() 1035 m.aok = m.a.Next() 1036 m.bok = m.b.Next() 1037 } 1038 1039 return true 1040} 1041func (m mergedStringIter) At() string { return m.cur } 1042func (m mergedStringIter) Err() error { 1043 if m.a.Err() != nil { 1044 return m.a.Err() 1045 } 1046 return m.b.Err() 1047} 1048