1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package compact 5 6import ( 7 "context" 8 "fmt" 9 "math" 10 "path/filepath" 11 12 "github.com/go-kit/kit/log" 13 "github.com/oklog/ulid" 14 "github.com/pkg/errors" 15 "github.com/prometheus/client_golang/prometheus" 16 "github.com/thanos-io/thanos/pkg/block" 17 "github.com/thanos-io/thanos/pkg/block/metadata" 18 "github.com/thanos-io/thanos/pkg/objstore" 19) 20 21type tsdbBasedPlanner struct { 22 logger log.Logger 23 24 ranges []int64 25 26 noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark 27} 28 29var _ Planner = &tsdbBasedPlanner{} 30 31// NewTSDBBasedPlanner is planner with the same functionality as Prometheus' TSDB. 32// TODO(bwplotka): Consider upstreaming this to Prometheus. 33// It's the same functionality just without accessing filesystem. 34func NewTSDBBasedPlanner(logger log.Logger, ranges []int64) *tsdbBasedPlanner { 35 return &tsdbBasedPlanner{ 36 logger: logger, 37 ranges: ranges, 38 noCompBlocksFunc: func() map[ulid.ULID]*metadata.NoCompactMark { 39 return make(map[ulid.ULID]*metadata.NoCompactMark) 40 }, 41 } 42} 43 44// NewPlanner is a default Thanos planner with the same functionality as Prometheus' TSDB plus special handling of excluded blocks. 45// It's the same functionality just without accessing filesystem, and special handling of excluded blocks. 46func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompactionMarkFilter) *tsdbBasedPlanner { 47 return &tsdbBasedPlanner{logger: logger, ranges: ranges, noCompBlocksFunc: noCompBlocks.NoCompactMarkedBlocks} 48} 49 50// TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405 51func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { 52 return p.plan(p.noCompBlocksFunc(), metasByMinTime) 53} 54 55func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { 56 notExcludedMetasByMinTime := make([]*metadata.Meta, 0, len(metasByMinTime)) 57 for _, meta := range metasByMinTime { 58 if _, excluded := noCompactMarked[meta.ULID]; excluded { 59 continue 60 } 61 notExcludedMetasByMinTime = append(notExcludedMetasByMinTime, meta) 62 } 63 64 res := selectOverlappingMetas(notExcludedMetasByMinTime) 65 if len(res) > 0 { 66 return res, nil 67 } 68 // No overlapping blocks, do compaction the usual way. 69 70 // We do not include a recently producted block with max(minTime), so the block which was just uploaded to bucket. 71 // This gives users a window of a full block size maintenance if needed. 72 if _, excluded := noCompactMarked[metasByMinTime[len(metasByMinTime)-1].ULID]; !excluded { 73 notExcludedMetasByMinTime = notExcludedMetasByMinTime[:len(notExcludedMetasByMinTime)-1] 74 } 75 metasByMinTime = metasByMinTime[:len(metasByMinTime)-1] 76 res = append(res, selectMetas(p.ranges, noCompactMarked, metasByMinTime)...) 77 if len(res) > 0 { 78 return res, nil 79 } 80 81 // Compact any blocks with big enough time range that have >5% tombstones. 82 for i := len(notExcludedMetasByMinTime) - 1; i >= 0; i-- { 83 meta := notExcludedMetasByMinTime[i] 84 if meta.MaxTime-meta.MinTime < p.ranges[len(p.ranges)/2] { 85 break 86 } 87 if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { 88 return []*metadata.Meta{notExcludedMetasByMinTime[i]}, nil 89 } 90 } 91 92 return nil, nil 93} 94 95// selectMetas returns the dir metas that should be compacted into a single new block. 96// If only a single block range is configured, the result is always nil. 97// Copied and adjusted from https://github.com/prometheus/prometheus/blob/3d8826a3d42566684283a9b7f7e812e412c24407/tsdb/compact.go#L229. 98func selectMetas(ranges []int64, noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) []*metadata.Meta { 99 if len(ranges) < 2 || len(metasByMinTime) < 1 { 100 return nil 101 } 102 highTime := metasByMinTime[len(metasByMinTime)-1].MinTime 103 104 for _, iv := range ranges[1:] { 105 parts := splitByRange(metasByMinTime, iv) 106 if len(parts) == 0 { 107 continue 108 } 109 Outer: 110 for _, p := range parts { 111 // Do not select the range if it has a block whose compaction failed. 112 for _, m := range p { 113 if m.Compaction.Failed { 114 continue Outer 115 } 116 } 117 118 if len(p) < 2 { 119 continue 120 } 121 122 mint := p[0].MinTime 123 maxt := p[len(p)-1].MaxTime 124 125 // Pick the range of blocks if it spans the full range (potentially with gaps) or is before the most recent block. 126 // This ensures we don't compact blocks prematurely when another one of the same size still would fits in the range 127 // after upload. 128 if maxt-mint != iv && maxt > highTime { 129 continue 130 } 131 132 // Check if any of resulted blocks are excluded. Exclude them in a way that does not introduce gaps to the system 133 // as well as preserve the ranges that would be used if they were not excluded. 134 // This is meant as short-term workaround to create ability for marking some blocks to not be touched for compaction. 135 lastExcluded := 0 136 for i, id := range p { 137 if _, excluded := noCompactMarked[id.ULID]; !excluded { 138 continue 139 } 140 if len(p[lastExcluded:i]) > 1 { 141 return p[lastExcluded:i] 142 } 143 lastExcluded = i + 1 144 } 145 if len(p[lastExcluded:]) > 1 { 146 return p[lastExcluded:] 147 } 148 } 149 } 150 151 return nil 152} 153 154// selectOverlappingMetas returns all dirs with overlapping time ranges. 155// It expects sorted input by mint and returns the overlapping dirs in the same order as received. 156// Copied and adjusted from https://github.com/prometheus/prometheus/blob/3d8826a3d42566684283a9b7f7e812e412c24407/tsdb/compact.go#L268. 157func selectOverlappingMetas(metasByMinTime []*metadata.Meta) []*metadata.Meta { 158 if len(metasByMinTime) < 2 { 159 return nil 160 } 161 var overlappingMetas []*metadata.Meta 162 globalMaxt := metasByMinTime[0].MaxTime 163 for i, m := range metasByMinTime[1:] { 164 if m.MinTime < globalMaxt { 165 if len(overlappingMetas) == 0 { 166 // When it is the first overlap, need to add the last one as well. 167 overlappingMetas = append(overlappingMetas, metasByMinTime[i]) 168 } 169 overlappingMetas = append(overlappingMetas, m) 170 } else if len(overlappingMetas) > 0 { 171 break 172 } 173 174 if m.MaxTime > globalMaxt { 175 globalMaxt = m.MaxTime 176 } 177 } 178 return overlappingMetas 179} 180 181// splitByRange splits the directories by the time range. The range sequence starts at 0. 182// 183// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 184// it returns [0-10, 10-20], [50-60], [90-100]. 185// Copied and adjusted from: https://github.com/prometheus/prometheus/blob/3d8826a3d42566684283a9b7f7e812e412c24407/tsdb/compact.go#L294. 186func splitByRange(metasByMinTime []*metadata.Meta, tr int64) [][]*metadata.Meta { 187 var splitDirs [][]*metadata.Meta 188 189 for i := 0; i < len(metasByMinTime); { 190 var ( 191 group []*metadata.Meta 192 t0 int64 193 m = metasByMinTime[i] 194 ) 195 // Compute start of aligned time range of size tr closest to the current block's start. 196 if m.MinTime >= 0 { 197 t0 = tr * (m.MinTime / tr) 198 } else { 199 t0 = tr * ((m.MinTime - tr + 1) / tr) 200 } 201 202 // Skip blocks that don't fall into the range. This can happen via mis-alignment or 203 // by being the multiple of the intended range. 204 if m.MaxTime > t0+tr { 205 i++ 206 continue 207 } 208 209 // Add all metas to the current group that are within [t0, t0+tr]. 210 for ; i < len(metasByMinTime); i++ { 211 // Either the block falls into the next range or doesn't fit at all (checked above). 212 if metasByMinTime[i].MaxTime > t0+tr { 213 break 214 } 215 group = append(group, metasByMinTime[i]) 216 } 217 218 if len(group) > 0 { 219 splitDirs = append(splitDirs, group) 220 } 221 } 222 223 return splitDirs 224} 225 226type largeTotalIndexSizeFilter struct { 227 *tsdbBasedPlanner 228 229 bkt objstore.Bucket 230 markedForNoCompact prometheus.Counter 231 totalMaxIndexSizeBytes int64 232} 233 234var _ Planner = &largeTotalIndexSizeFilter{} 235 236// WithLargeTotalIndexSizeFilter wraps Planner with largeTotalIndexSizeFilter that checks the given plans and estimates total index size. 237// When found, it marks block for no compaction by placing no-compact-mark.json and updating cache. 238// NOTE: The estimation is very rough as it assumes extreme cases of indexes sharing no bytes, thus summing all source index sizes. 239// Adjust limit accordingly reducing to some % of actual limit you want to give. 240// TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390. 241func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, totalMaxIndexSizeBytes int64, markedForNoCompact prometheus.Counter) *largeTotalIndexSizeFilter { 242 return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact} 243} 244 245func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { 246 noCompactMarked := t.noCompBlocksFunc() 247 copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)) 248 for k, v := range noCompactMarked { 249 copiedNoCompactMarked[k] = v 250 } 251 252PlanLoop: 253 for { 254 plan, err := t.plan(copiedNoCompactMarked, metasByMinTime) 255 if err != nil { 256 return nil, err 257 } 258 var totalIndexBytes, maxIndexSize int64 = 0, math.MinInt64 259 var biggestIndex int 260 for i, p := range plan { 261 indexSize := int64(-1) 262 for _, f := range p.Thanos.Files { 263 if f.RelPath == block.IndexFilename { 264 indexSize = f.SizeBytes 265 } 266 } 267 if indexSize <= 0 { 268 // Get size from bkt instead. 269 attr, err := t.bkt.Attributes(ctx, filepath.Join(p.ULID.String(), block.IndexFilename)) 270 if err != nil { 271 return nil, errors.Wrapf(err, "get attr of %v", filepath.Join(p.ULID.String(), block.IndexFilename)) 272 } 273 indexSize = attr.Size 274 } 275 276 if maxIndexSize < indexSize { 277 maxIndexSize = indexSize 278 biggestIndex = i 279 } 280 totalIndexBytes += indexSize 281 // Leave 15% headroom for index compaction bloat. 282 if totalIndexBytes >= int64(float64(t.totalMaxIndexSizeBytes)*0.85) { 283 // Marking blocks for no compact to limit size. 284 // TODO(bwplotka): Make sure to reset cache once this is done: https://github.com/thanos-io/thanos/issues/3408 285 if err := block.MarkForNoCompact( 286 ctx, 287 t.logger, 288 t.bkt, 289 plan[biggestIndex].ULID, 290 metadata.IndexSizeExceedingNoCompactReason, 291 fmt.Sprintf("largeTotalIndexSizeFilter: Total compacted block's index size could exceed: %v with this block. See https://github.com/thanos-io/thanos/issues/1424", t.totalMaxIndexSizeBytes), 292 t.markedForNoCompact, 293 ); err != nil { 294 return nil, errors.Wrapf(err, "mark %v for no compaction", plan[biggestIndex].ULID.String()) 295 } 296 // Make sure wrapped planner exclude this block. 297 copiedNoCompactMarked[plan[biggestIndex].ULID] = &metadata.NoCompactMark{ID: plan[biggestIndex].ULID, Version: metadata.NoCompactMarkVersion1} 298 continue PlanLoop 299 } 300 } 301 // Planned blocks should not exceed limit. 302 return plan, nil 303 } 304} 305