1// This file and its contents are licensed under the Apache License 2.0.
2// Please see the included NOTICE for copyright information and
3// LICENSE for a copy of the license.
4
5package planner
6
7import (
8	"context"
9	"fmt"
10	"os"
11	"sync"
12	"sync/atomic"
13	"time"
14
15	"github.com/prometheus/common/config"
16	"github.com/prometheus/prometheus/pkg/labels"
17	"github.com/prometheus/prometheus/pkg/timestamp"
18	"github.com/schollz/progressbar/v3"
19	"github.com/timescale/promscale/pkg/log"
20	"github.com/timescale/promscale/pkg/migration-tool/utils"
21)
22
23const numStepsWriter = 5 // Number of progress steps for the progress-bar of the writer.
24
25var (
26	second = time.Second.Milliseconds()
27	minute = time.Minute.Milliseconds()
28)
29
30// Config represents configuration for the planner.
31type Config struct {
32	Mint                 int64
33	Maxt                 int64
34	SlabSizeLimitBytes   int64
35	NumStores            int
36	ProgressEnabled      bool
37	JobName              string
38	ProgressMetricName   string // Name for progress metric.
39	ProgressClientConfig utils.ClientConfig
40	HTTPConfig           config.HTTPClientConfig
41	LaIncrement          time.Duration
42	MaxReadDuration      time.Duration
43	HumanReadableTime    bool
44}
45
46// Plan represents the plannings done by the planner.
47type Plan struct {
48	config *Config
49
50	// Slab configs.
51	slabCounts         int64 // Used in maintaining the ID of the in-memory slabs.
52	pbarMux            *sync.Mutex
53	nextMint           int64
54	lastNumBytes       int64
55	lastTimeRangeDelta int64
56	deltaIncRegion     int64 // Time region for which the time-range delta can continue to increase by laIncrement.
57
58	// Test configs.
59	Quiet         bool   // Avoid progress-bars during logs.
60	TestCheckFunc func() // Helps peek into planner during testing. It is called at createSlab() to check the stats of the last slab.
61}
62
63// Init creates an in-memory planner and initializes it. It is responsible for fetching the last pushed maxt and based on that, updates
64// the mint for the provided migration.
65func Init(config *Config) (*Plan, bool, error) {
66	var found bool
67	if config.ProgressEnabled {
68		if config.ProgressClientConfig.URL == "" {
69			return nil, false, fmt.Errorf("read url for remote-write storage should be provided when progress metric is enabled")
70		}
71
72		lastPushedMaxt, found, err := config.fetchLastPushedMaxt()
73		if err != nil {
74			return nil, false, fmt.Errorf("init plan: %w", err)
75		}
76		if found && lastPushedMaxt > config.Mint && lastPushedMaxt <= config.Maxt {
77			config.Mint = lastPushedMaxt
78			timeRange := fmt.Sprintf("time-range: %d mins", (config.Maxt-lastPushedMaxt+1)/minute)
79			log.Warn("msg", fmt.Sprintf("Resuming from where we left off. Last push was on %d. "+
80				"Resuming from mint: %d to maxt: %d %s", lastPushedMaxt, config.Mint, config.Maxt, timeRange))
81		}
82	} else {
83		log.Info("msg", "Resuming from where we left off is turned off. Starting at the beginning of the provided time-range.")
84	}
85	if config.Mint >= config.Maxt && found {
86		log.Info("msg", "mint greater than or equal to maxt. Migration is already complete.")
87		return nil, false, nil
88	} else if config.Mint >= config.Maxt && !found {
89		// Extra sanitary check, even though this will be caught by validateConf().
90		return nil, false, fmt.Errorf("mint cannot be greater than maxt: mint %d maxt %d", config.Mint, config.Mint)
91	}
92	plan := &Plan{
93		config:         config,
94		pbarMux:        new(sync.Mutex),
95		nextMint:       config.Mint,
96		deltaIncRegion: config.SlabSizeLimitBytes / 2, // 50% of the total slab size limit.
97	}
98	return plan, true, nil
99}
100
101// fetchLastPushedMaxt fetches the maxt of the last slab pushed to remote-write storage. At present, this is developed
102// for a single migration job (i.e., not supporting multiple migration metrics and successive migrations).
103func (c *Config) fetchLastPushedMaxt() (lastPushedMaxt int64, found bool, err error) {
104	query, err := utils.CreatePrombQuery(c.Mint, c.Maxt, []*labels.Matcher{
105		labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, c.ProgressMetricName),
106		labels.MustNewMatcher(labels.MatchEqual, utils.LabelJob, c.JobName),
107	})
108	if err != nil {
109		return -1, false, fmt.Errorf("fetch-last-pushed-maxt create promb query: %w", err)
110	}
111	readClient, err := utils.NewClient("reader-last-maxt-pushed", c.ProgressClientConfig, c.HTTPConfig)
112	if err != nil {
113		return -1, false, fmt.Errorf("create fetch-last-pushed-maxt reader: %w", err)
114	}
115	result, _, _, err := readClient.Read(context.Background(), query, "")
116	if err != nil {
117		return -1, false, fmt.Errorf("fetch-last-pushed-maxt query result: %w", err)
118	}
119	ts := result.Timeseries
120	if len(ts) == 0 {
121		return -1, false, nil
122	}
123	for _, series := range ts {
124		for i := len(series.Samples) - 1; i >= 0; i-- {
125			if series.Samples[i].Timestamp > lastPushedMaxt {
126				lastPushedMaxt = series.Samples[i].Timestamp
127			}
128		}
129	}
130	if lastPushedMaxt == 0 {
131		return -1, false, nil
132	}
133	return lastPushedMaxt, true, nil
134}
135
136func (p *Plan) DecrementSlabCount() {
137	atomic.AddInt64(&p.slabCounts, -1)
138}
139
140// ShouldProceed reports whether the fetching process should proceeds further. If any time-range is left to be
141// fetched from the provided time-range, it returns true, else false.
142func (p *Plan) ShouldProceed() bool {
143	return p.nextMint < p.config.Maxt
144}
145
146// update updates the details of the planner that are dependent on previous fetch stats.
147func (p *Plan) update(numBytes int) {
148	atomic.StoreInt64(&p.lastNumBytes, int64(numBytes))
149}
150
151func (p *Plan) LastMemoryFootprint() int64 {
152	return atomic.LoadInt64(&p.lastNumBytes)
153}
154
155// NextSlab returns a new slab after allocating the time-range for fetch.
156func (p *Plan) NextSlab() (reference *Slab, err error) {
157	timeDelta := p.config.determineTimeDelta(atomic.LoadInt64(&p.lastNumBytes), p.config.SlabSizeLimitBytes, p.lastTimeRangeDelta)
158	mint := p.nextMint
159	maxt := mint + timeDelta
160	if maxt > p.config.Maxt {
161		maxt = p.config.Maxt
162	}
163	p.nextMint = maxt
164	p.lastTimeRangeDelta = timeDelta
165	bRef, err := p.createSlab(mint, maxt)
166	if err != nil {
167		return nil, fmt.Errorf("next-slab: %w", err)
168	}
169	return bRef, nil
170}
171
172// createSlab creates a new slab and returns reference to the slab for faster write and read operations.
173func (p *Plan) createSlab(mint, maxt int64) (ref *Slab, err error) {
174	if err = p.validateT(mint, maxt); err != nil {
175		return nil, fmt.Errorf("create-slab: %w", err)
176	}
177	id := atomic.AddInt64(&p.slabCounts, 1)
178	timeRangeInMinutes := (maxt - mint) / minute
179	percent := float64(maxt-p.config.Mint) * 100 / float64(p.config.Maxt-p.config.Mint)
180	if percent > 100 {
181		percent = 100
182	}
183	baseDescription := fmt.Sprintf("progress: %.2f%% | slab-%d time-range: %d mins | start: %d | end: %d", percent, id, timeRangeInMinutes, mint/second, maxt/second)
184	if p.config.HumanReadableTime {
185		baseDescription = fmt.Sprintf("progress: %.2f%% | slab-%d time-range: %d mins | start: %s | end: %s", percent, id, timeRangeInMinutes, timestamp.Time(mint), timestamp.Time(maxt))
186	}
187	ref = slabPool.Get().(*Slab)
188	ref.id = id
189	ref.pbar = progressbar.NewOptions(
190		numStepsWriter,
191		progressbar.OptionOnCompletion(func() {
192			_, _ = fmt.Fprint(os.Stdout, "\n")
193		}),
194	)
195	ref.pbarDescriptionPrefix = baseDescription
196	ref.numStores = p.config.NumStores
197	if cap(ref.stores) < p.config.NumStores {
198		// This is expect to allocate only for the first slab, then the allocated space is reused.
199		// On the edge case, we allocate again, which was the previous behaviour.
200		ref.stores = make([]store, p.config.NumStores)
201	} else {
202		ref.stores = ref.stores[:p.config.NumStores]
203	}
204	ref.mint = mint
205	ref.maxt = maxt
206	ref.pbarMux = p.pbarMux
207	ref.plan = p
208
209	ref.initStores()
210	if p.Quiet {
211		ref.pbar = nil
212		ref.pbarDescriptionPrefix = ""
213	}
214	if p.TestCheckFunc != nil {
215		// This runs only during integration tests. It enables the tests to access and test the internal
216		// state of the planner.
217		p.TestCheckFunc()
218	}
219	return
220}
221
222func (p *Plan) validateT(mint, maxt int64) error {
223	switch {
224	case p.config.Mint > mint || p.config.Maxt < mint:
225		return fmt.Errorf("invalid mint: %d: global-mint: %d and global-maxt: %d", mint, p.config.Mint, p.config.Maxt)
226	case p.config.Mint > maxt || p.config.Maxt < maxt:
227		return fmt.Errorf("invalid maxt: %d: global-mint: %d and global-maxt: %d", mint, p.config.Mint, p.config.Maxt)
228	case mint > maxt:
229		return fmt.Errorf("mint cannot be greater than maxt: mint: %d and maxt: %d", mint, maxt)
230	}
231	return nil
232}
233
234func (c *Config) determineTimeDelta(numBytes, limit int64, prevTimeDelta int64) int64 {
235	switch {
236	case numBytes <= limit/2:
237		// deltaIncreaseRegion.
238		// We increase the time-range linearly for the next fetch if the current time-range fetch resulted in size that is
239		// less than half the max read size. This continues till we reach the maximum time-range delta.
240		return c.clampTimeDelta(prevTimeDelta + c.LaIncrement.Milliseconds())
241	case numBytes > limit:
242		// Down size the time exponentially so that bytes size can be controlled (preventing OOM).
243		log.Info("msg", fmt.Sprintf("decreasing time-range delta to %d minute(s) since size beyond permittable limits", prevTimeDelta/(2*minute)))
244		return prevTimeDelta / 2
245	}
246	// Here, the numBytes is between the max increment-time size limit (i.e., limit/2) and the max read limit
247	// (i.e., increment-time size limit < numBytes <= max read limit). This region is an ideal case of
248	// balance between migration speed and memory utilization.
249	//
250	// Example: If the limit is 500MB, then the max increment-time size limit will be 250MB. This means that till the numBytes is below
251	// 250MB, the time-range for next fetch will continue to increase by 1 minute (on the previous fetch time-range). However,
252	// the moment any slab comes between 250MB and 500MB, we stop to increment the time-range delta further. This helps
253	// keeping the migration tool in safe memory limits.
254	return prevTimeDelta
255}
256
257func (c *Config) clampTimeDelta(t int64) int64 {
258	if t > c.MaxReadDuration.Milliseconds() {
259		log.Info("msg", "Exceeded 'max-read-duration', setting back to 'max-read-duration' value", "max-read-duration", c.MaxReadDuration.String())
260		return c.MaxReadDuration.Milliseconds()
261	}
262	return t
263}
264