1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package compact
5
6import (
7	"context"
8	"fmt"
9	"math"
10	"path/filepath"
11
12	"github.com/go-kit/kit/log"
13	"github.com/oklog/ulid"
14	"github.com/pkg/errors"
15	"github.com/prometheus/client_golang/prometheus"
16	"github.com/thanos-io/thanos/pkg/block"
17	"github.com/thanos-io/thanos/pkg/block/metadata"
18	"github.com/thanos-io/thanos/pkg/objstore"
19)
20
21type tsdbBasedPlanner struct {
22	logger log.Logger
23
24	ranges []int64
25
26	noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
27}
28
29var _ Planner = &tsdbBasedPlanner{}
30
31// NewTSDBBasedPlanner is planner with the same functionality as Prometheus' TSDB.
32// TODO(bwplotka): Consider upstreaming this to Prometheus.
33// It's the same functionality just without accessing filesystem.
34func NewTSDBBasedPlanner(logger log.Logger, ranges []int64) *tsdbBasedPlanner {
35	return &tsdbBasedPlanner{
36		logger: logger,
37		ranges: ranges,
38		noCompBlocksFunc: func() map[ulid.ULID]*metadata.NoCompactMark {
39			return make(map[ulid.ULID]*metadata.NoCompactMark)
40		},
41	}
42}
43
44// NewPlanner is a default Thanos planner with the same functionality as Prometheus' TSDB plus special handling of excluded blocks.
45// It's the same functionality just without accessing filesystem, and special handling of excluded blocks.
46func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompactionMarkFilter) *tsdbBasedPlanner {
47	return &tsdbBasedPlanner{logger: logger, ranges: ranges, noCompBlocksFunc: noCompBlocks.NoCompactMarkedBlocks}
48}
49
50// TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405
51func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
52	return p.plan(p.noCompBlocksFunc(), metasByMinTime)
53}
54
55func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
56	notExcludedMetasByMinTime := make([]*metadata.Meta, 0, len(metasByMinTime))
57	for _, meta := range metasByMinTime {
58		if _, excluded := noCompactMarked[meta.ULID]; excluded {
59			continue
60		}
61		notExcludedMetasByMinTime = append(notExcludedMetasByMinTime, meta)
62	}
63
64	res := selectOverlappingMetas(notExcludedMetasByMinTime)
65	if len(res) > 0 {
66		return res, nil
67	}
68	// No overlapping blocks, do compaction the usual way.
69
70	// We do not include a recently producted block with max(minTime), so the block which was just uploaded to bucket.
71	// This gives users a window of a full block size maintenance if needed.
72	if _, excluded := noCompactMarked[metasByMinTime[len(metasByMinTime)-1].ULID]; !excluded {
73		notExcludedMetasByMinTime = notExcludedMetasByMinTime[:len(notExcludedMetasByMinTime)-1]
74	}
75	metasByMinTime = metasByMinTime[:len(metasByMinTime)-1]
76	res = append(res, selectMetas(p.ranges, noCompactMarked, metasByMinTime)...)
77	if len(res) > 0 {
78		return res, nil
79	}
80
81	// Compact any blocks with big enough time range that have >5% tombstones.
82	for i := len(notExcludedMetasByMinTime) - 1; i >= 0; i-- {
83		meta := notExcludedMetasByMinTime[i]
84		if meta.MaxTime-meta.MinTime < p.ranges[len(p.ranges)/2] {
85			break
86		}
87		if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
88			return []*metadata.Meta{notExcludedMetasByMinTime[i]}, nil
89		}
90	}
91
92	return nil, nil
93}
94
95// selectMetas returns the dir metas that should be compacted into a single new block.
96// If only a single block range is configured, the result is always nil.
97// Copied and adjusted from https://github.com/prometheus/prometheus/blob/3d8826a3d42566684283a9b7f7e812e412c24407/tsdb/compact.go#L229.
98func selectMetas(ranges []int64, noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) []*metadata.Meta {
99	if len(ranges) < 2 || len(metasByMinTime) < 1 {
100		return nil
101	}
102	highTime := metasByMinTime[len(metasByMinTime)-1].MinTime
103
104	for _, iv := range ranges[1:] {
105		parts := splitByRange(metasByMinTime, iv)
106		if len(parts) == 0 {
107			continue
108		}
109	Outer:
110		for _, p := range parts {
111			// Do not select the range if it has a block whose compaction failed.
112			for _, m := range p {
113				if m.Compaction.Failed {
114					continue Outer
115				}
116			}
117
118			if len(p) < 2 {
119				continue
120			}
121
122			mint := p[0].MinTime
123			maxt := p[len(p)-1].MaxTime
124
125			// Pick the range of blocks if it spans the full range (potentially with gaps) or is before the most recent block.
126			// This ensures we don't compact blocks prematurely when another one of the same size still would fits in the range
127			// after upload.
128			if maxt-mint != iv && maxt > highTime {
129				continue
130			}
131
132			// Check if any of resulted blocks are excluded. Exclude them in a way that does not introduce gaps to the system
133			// as well as preserve the ranges that would be used if they were not excluded.
134			// This is meant as short-term workaround to create ability for marking some blocks to not be touched for compaction.
135			lastExcluded := 0
136			for i, id := range p {
137				if _, excluded := noCompactMarked[id.ULID]; !excluded {
138					continue
139				}
140				if len(p[lastExcluded:i]) > 1 {
141					return p[lastExcluded:i]
142				}
143				lastExcluded = i + 1
144			}
145			if len(p[lastExcluded:]) > 1 {
146				return p[lastExcluded:]
147			}
148		}
149	}
150
151	return nil
152}
153
154// selectOverlappingMetas returns all dirs with overlapping time ranges.
155// It expects sorted input by mint and returns the overlapping dirs in the same order as received.
156// Copied and adjusted from https://github.com/prometheus/prometheus/blob/3d8826a3d42566684283a9b7f7e812e412c24407/tsdb/compact.go#L268.
157func selectOverlappingMetas(metasByMinTime []*metadata.Meta) []*metadata.Meta {
158	if len(metasByMinTime) < 2 {
159		return nil
160	}
161	var overlappingMetas []*metadata.Meta
162	globalMaxt := metasByMinTime[0].MaxTime
163	for i, m := range metasByMinTime[1:] {
164		if m.MinTime < globalMaxt {
165			if len(overlappingMetas) == 0 {
166				// When it is the first overlap, need to add the last one as well.
167				overlappingMetas = append(overlappingMetas, metasByMinTime[i])
168			}
169			overlappingMetas = append(overlappingMetas, m)
170		} else if len(overlappingMetas) > 0 {
171			break
172		}
173
174		if m.MaxTime > globalMaxt {
175			globalMaxt = m.MaxTime
176		}
177	}
178	return overlappingMetas
179}
180
181// splitByRange splits the directories by the time range. The range sequence starts at 0.
182//
183// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30
184// it returns [0-10, 10-20], [50-60], [90-100].
185// Copied and adjusted from: https://github.com/prometheus/prometheus/blob/3d8826a3d42566684283a9b7f7e812e412c24407/tsdb/compact.go#L294.
186func splitByRange(metasByMinTime []*metadata.Meta, tr int64) [][]*metadata.Meta {
187	var splitDirs [][]*metadata.Meta
188
189	for i := 0; i < len(metasByMinTime); {
190		var (
191			group []*metadata.Meta
192			t0    int64
193			m     = metasByMinTime[i]
194		)
195		// Compute start of aligned time range of size tr closest to the current block's start.
196		if m.MinTime >= 0 {
197			t0 = tr * (m.MinTime / tr)
198		} else {
199			t0 = tr * ((m.MinTime - tr + 1) / tr)
200		}
201
202		// Skip blocks that don't fall into the range. This can happen via mis-alignment or
203		// by being the multiple of the intended range.
204		if m.MaxTime > t0+tr {
205			i++
206			continue
207		}
208
209		// Add all metas to the current group that are within [t0, t0+tr].
210		for ; i < len(metasByMinTime); i++ {
211			// Either the block falls into the next range or doesn't fit at all (checked above).
212			if metasByMinTime[i].MaxTime > t0+tr {
213				break
214			}
215			group = append(group, metasByMinTime[i])
216		}
217
218		if len(group) > 0 {
219			splitDirs = append(splitDirs, group)
220		}
221	}
222
223	return splitDirs
224}
225
226type largeTotalIndexSizeFilter struct {
227	*tsdbBasedPlanner
228
229	bkt                    objstore.Bucket
230	markedForNoCompact     prometheus.Counter
231	totalMaxIndexSizeBytes int64
232}
233
234var _ Planner = &largeTotalIndexSizeFilter{}
235
236// WithLargeTotalIndexSizeFilter wraps Planner with largeTotalIndexSizeFilter that checks the given plans and estimates total index size.
237// When found, it marks block for no compaction by placing no-compact-mark.json and updating cache.
238// NOTE: The estimation is very rough as it assumes extreme cases of indexes sharing no bytes, thus summing all source index sizes.
239// Adjust limit accordingly reducing to some % of actual limit you want to give.
240// TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390.
241func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, totalMaxIndexSizeBytes int64, markedForNoCompact prometheus.Counter) *largeTotalIndexSizeFilter {
242	return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact}
243}
244
245func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
246	noCompactMarked := t.noCompBlocksFunc()
247	copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked))
248	for k, v := range noCompactMarked {
249		copiedNoCompactMarked[k] = v
250	}
251
252PlanLoop:
253	for {
254		plan, err := t.plan(copiedNoCompactMarked, metasByMinTime)
255		if err != nil {
256			return nil, err
257		}
258		var totalIndexBytes, maxIndexSize int64 = 0, math.MinInt64
259		var biggestIndex int
260		for i, p := range plan {
261			indexSize := int64(-1)
262			for _, f := range p.Thanos.Files {
263				if f.RelPath == block.IndexFilename {
264					indexSize = f.SizeBytes
265				}
266			}
267			if indexSize <= 0 {
268				// Get size from bkt instead.
269				attr, err := t.bkt.Attributes(ctx, filepath.Join(p.ULID.String(), block.IndexFilename))
270				if err != nil {
271					return nil, errors.Wrapf(err, "get attr of %v", filepath.Join(p.ULID.String(), block.IndexFilename))
272				}
273				indexSize = attr.Size
274			}
275
276			if maxIndexSize < indexSize {
277				maxIndexSize = indexSize
278				biggestIndex = i
279			}
280			totalIndexBytes += indexSize
281			// Leave 15% headroom for index compaction bloat.
282			if totalIndexBytes >= int64(float64(t.totalMaxIndexSizeBytes)*0.85) {
283				// Marking blocks for no compact to limit size.
284				// TODO(bwplotka): Make sure to reset cache once this is done: https://github.com/thanos-io/thanos/issues/3408
285				if err := block.MarkForNoCompact(
286					ctx,
287					t.logger,
288					t.bkt,
289					plan[biggestIndex].ULID,
290					metadata.IndexSizeExceedingNoCompactReason,
291					fmt.Sprintf("largeTotalIndexSizeFilter: Total compacted block's index size could exceed: %v with this block. See https://github.com/thanos-io/thanos/issues/1424", t.totalMaxIndexSizeBytes),
292					t.markedForNoCompact,
293				); err != nil {
294					return nil, errors.Wrapf(err, "mark %v for no compaction", plan[biggestIndex].ULID.String())
295				}
296				// Make sure wrapped planner exclude this block.
297				copiedNoCompactMarked[plan[biggestIndex].ULID] = &metadata.NoCompactMark{ID: plan[biggestIndex].ULID, Version: metadata.NoCompactMarkVersion1}
298				continue PlanLoop
299			}
300		}
301		// Planned blocks should not exceed limit.
302		return plan, nil
303	}
304}
305