1// This file and its contents are licensed under the Apache License 2.0. 2// Please see the included NOTICE for copyright information and 3// LICENSE for a copy of the license. 4 5package planner 6 7import ( 8 "context" 9 "fmt" 10 "os" 11 "sync" 12 "sync/atomic" 13 "time" 14 15 "github.com/prometheus/common/config" 16 "github.com/prometheus/prometheus/pkg/labels" 17 "github.com/prometheus/prometheus/pkg/timestamp" 18 "github.com/schollz/progressbar/v3" 19 "github.com/timescale/promscale/pkg/log" 20 "github.com/timescale/promscale/pkg/migration-tool/utils" 21) 22 23const numStepsWriter = 5 // Number of progress steps for the progress-bar of the writer. 24 25var ( 26 second = time.Second.Milliseconds() 27 minute = time.Minute.Milliseconds() 28) 29 30// Config represents configuration for the planner. 31type Config struct { 32 Mint int64 33 Maxt int64 34 SlabSizeLimitBytes int64 35 NumStores int 36 ProgressEnabled bool 37 JobName string 38 ProgressMetricName string // Name for progress metric. 39 ProgressClientConfig utils.ClientConfig 40 HTTPConfig config.HTTPClientConfig 41 LaIncrement time.Duration 42 MaxReadDuration time.Duration 43 HumanReadableTime bool 44} 45 46// Plan represents the plannings done by the planner. 47type Plan struct { 48 config *Config 49 50 // Slab configs. 51 slabCounts int64 // Used in maintaining the ID of the in-memory slabs. 52 pbarMux *sync.Mutex 53 nextMint int64 54 lastNumBytes int64 55 lastTimeRangeDelta int64 56 deltaIncRegion int64 // Time region for which the time-range delta can continue to increase by laIncrement. 57 58 // Test configs. 59 Quiet bool // Avoid progress-bars during logs. 60 TestCheckFunc func() // Helps peek into planner during testing. It is called at createSlab() to check the stats of the last slab. 61} 62 63// Init creates an in-memory planner and initializes it. It is responsible for fetching the last pushed maxt and based on that, updates 64// the mint for the provided migration. 65func Init(config *Config) (*Plan, bool, error) { 66 var found bool 67 if config.ProgressEnabled { 68 if config.ProgressClientConfig.URL == "" { 69 return nil, false, fmt.Errorf("read url for remote-write storage should be provided when progress metric is enabled") 70 } 71 72 lastPushedMaxt, found, err := config.fetchLastPushedMaxt() 73 if err != nil { 74 return nil, false, fmt.Errorf("init plan: %w", err) 75 } 76 if found && lastPushedMaxt > config.Mint && lastPushedMaxt <= config.Maxt { 77 config.Mint = lastPushedMaxt 78 timeRange := fmt.Sprintf("time-range: %d mins", (config.Maxt-lastPushedMaxt+1)/minute) 79 log.Warn("msg", fmt.Sprintf("Resuming from where we left off. Last push was on %d. "+ 80 "Resuming from mint: %d to maxt: %d %s", lastPushedMaxt, config.Mint, config.Maxt, timeRange)) 81 } 82 } else { 83 log.Info("msg", "Resuming from where we left off is turned off. Starting at the beginning of the provided time-range.") 84 } 85 if config.Mint >= config.Maxt && found { 86 log.Info("msg", "mint greater than or equal to maxt. Migration is already complete.") 87 return nil, false, nil 88 } else if config.Mint >= config.Maxt && !found { 89 // Extra sanitary check, even though this will be caught by validateConf(). 90 return nil, false, fmt.Errorf("mint cannot be greater than maxt: mint %d maxt %d", config.Mint, config.Mint) 91 } 92 plan := &Plan{ 93 config: config, 94 pbarMux: new(sync.Mutex), 95 nextMint: config.Mint, 96 deltaIncRegion: config.SlabSizeLimitBytes / 2, // 50% of the total slab size limit. 97 } 98 return plan, true, nil 99} 100 101// fetchLastPushedMaxt fetches the maxt of the last slab pushed to remote-write storage. At present, this is developed 102// for a single migration job (i.e., not supporting multiple migration metrics and successive migrations). 103func (c *Config) fetchLastPushedMaxt() (lastPushedMaxt int64, found bool, err error) { 104 query, err := utils.CreatePrombQuery(c.Mint, c.Maxt, []*labels.Matcher{ 105 labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, c.ProgressMetricName), 106 labels.MustNewMatcher(labels.MatchEqual, utils.LabelJob, c.JobName), 107 }) 108 if err != nil { 109 return -1, false, fmt.Errorf("fetch-last-pushed-maxt create promb query: %w", err) 110 } 111 readClient, err := utils.NewClient("reader-last-maxt-pushed", c.ProgressClientConfig, c.HTTPConfig) 112 if err != nil { 113 return -1, false, fmt.Errorf("create fetch-last-pushed-maxt reader: %w", err) 114 } 115 result, _, _, err := readClient.Read(context.Background(), query, "") 116 if err != nil { 117 return -1, false, fmt.Errorf("fetch-last-pushed-maxt query result: %w", err) 118 } 119 ts := result.Timeseries 120 if len(ts) == 0 { 121 return -1, false, nil 122 } 123 for _, series := range ts { 124 for i := len(series.Samples) - 1; i >= 0; i-- { 125 if series.Samples[i].Timestamp > lastPushedMaxt { 126 lastPushedMaxt = series.Samples[i].Timestamp 127 } 128 } 129 } 130 if lastPushedMaxt == 0 { 131 return -1, false, nil 132 } 133 return lastPushedMaxt, true, nil 134} 135 136func (p *Plan) DecrementSlabCount() { 137 atomic.AddInt64(&p.slabCounts, -1) 138} 139 140// ShouldProceed reports whether the fetching process should proceeds further. If any time-range is left to be 141// fetched from the provided time-range, it returns true, else false. 142func (p *Plan) ShouldProceed() bool { 143 return p.nextMint < p.config.Maxt 144} 145 146// update updates the details of the planner that are dependent on previous fetch stats. 147func (p *Plan) update(numBytes int) { 148 atomic.StoreInt64(&p.lastNumBytes, int64(numBytes)) 149} 150 151func (p *Plan) LastMemoryFootprint() int64 { 152 return atomic.LoadInt64(&p.lastNumBytes) 153} 154 155// NextSlab returns a new slab after allocating the time-range for fetch. 156func (p *Plan) NextSlab() (reference *Slab, err error) { 157 timeDelta := p.config.determineTimeDelta(atomic.LoadInt64(&p.lastNumBytes), p.config.SlabSizeLimitBytes, p.lastTimeRangeDelta) 158 mint := p.nextMint 159 maxt := mint + timeDelta 160 if maxt > p.config.Maxt { 161 maxt = p.config.Maxt 162 } 163 p.nextMint = maxt 164 p.lastTimeRangeDelta = timeDelta 165 bRef, err := p.createSlab(mint, maxt) 166 if err != nil { 167 return nil, fmt.Errorf("next-slab: %w", err) 168 } 169 return bRef, nil 170} 171 172// createSlab creates a new slab and returns reference to the slab for faster write and read operations. 173func (p *Plan) createSlab(mint, maxt int64) (ref *Slab, err error) { 174 if err = p.validateT(mint, maxt); err != nil { 175 return nil, fmt.Errorf("create-slab: %w", err) 176 } 177 id := atomic.AddInt64(&p.slabCounts, 1) 178 timeRangeInMinutes := (maxt - mint) / minute 179 percent := float64(maxt-p.config.Mint) * 100 / float64(p.config.Maxt-p.config.Mint) 180 if percent > 100 { 181 percent = 100 182 } 183 baseDescription := fmt.Sprintf("progress: %.2f%% | slab-%d time-range: %d mins | start: %d | end: %d", percent, id, timeRangeInMinutes, mint/second, maxt/second) 184 if p.config.HumanReadableTime { 185 baseDescription = fmt.Sprintf("progress: %.2f%% | slab-%d time-range: %d mins | start: %s | end: %s", percent, id, timeRangeInMinutes, timestamp.Time(mint), timestamp.Time(maxt)) 186 } 187 ref = slabPool.Get().(*Slab) 188 ref.id = id 189 ref.pbar = progressbar.NewOptions( 190 numStepsWriter, 191 progressbar.OptionOnCompletion(func() { 192 _, _ = fmt.Fprint(os.Stdout, "\n") 193 }), 194 ) 195 ref.pbarDescriptionPrefix = baseDescription 196 ref.numStores = p.config.NumStores 197 if cap(ref.stores) < p.config.NumStores { 198 // This is expect to allocate only for the first slab, then the allocated space is reused. 199 // On the edge case, we allocate again, which was the previous behaviour. 200 ref.stores = make([]store, p.config.NumStores) 201 } else { 202 ref.stores = ref.stores[:p.config.NumStores] 203 } 204 ref.mint = mint 205 ref.maxt = maxt 206 ref.pbarMux = p.pbarMux 207 ref.plan = p 208 209 ref.initStores() 210 if p.Quiet { 211 ref.pbar = nil 212 ref.pbarDescriptionPrefix = "" 213 } 214 if p.TestCheckFunc != nil { 215 // This runs only during integration tests. It enables the tests to access and test the internal 216 // state of the planner. 217 p.TestCheckFunc() 218 } 219 return 220} 221 222func (p *Plan) validateT(mint, maxt int64) error { 223 switch { 224 case p.config.Mint > mint || p.config.Maxt < mint: 225 return fmt.Errorf("invalid mint: %d: global-mint: %d and global-maxt: %d", mint, p.config.Mint, p.config.Maxt) 226 case p.config.Mint > maxt || p.config.Maxt < maxt: 227 return fmt.Errorf("invalid maxt: %d: global-mint: %d and global-maxt: %d", mint, p.config.Mint, p.config.Maxt) 228 case mint > maxt: 229 return fmt.Errorf("mint cannot be greater than maxt: mint: %d and maxt: %d", mint, maxt) 230 } 231 return nil 232} 233 234func (c *Config) determineTimeDelta(numBytes, limit int64, prevTimeDelta int64) int64 { 235 switch { 236 case numBytes <= limit/2: 237 // deltaIncreaseRegion. 238 // We increase the time-range linearly for the next fetch if the current time-range fetch resulted in size that is 239 // less than half the max read size. This continues till we reach the maximum time-range delta. 240 return c.clampTimeDelta(prevTimeDelta + c.LaIncrement.Milliseconds()) 241 case numBytes > limit: 242 // Down size the time exponentially so that bytes size can be controlled (preventing OOM). 243 log.Info("msg", fmt.Sprintf("decreasing time-range delta to %d minute(s) since size beyond permittable limits", prevTimeDelta/(2*minute))) 244 return prevTimeDelta / 2 245 } 246 // Here, the numBytes is between the max increment-time size limit (i.e., limit/2) and the max read limit 247 // (i.e., increment-time size limit < numBytes <= max read limit). This region is an ideal case of 248 // balance between migration speed and memory utilization. 249 // 250 // Example: If the limit is 500MB, then the max increment-time size limit will be 250MB. This means that till the numBytes is below 251 // 250MB, the time-range for next fetch will continue to increase by 1 minute (on the previous fetch time-range). However, 252 // the moment any slab comes between 250MB and 500MB, we stop to increment the time-range delta further. This helps 253 // keeping the migration tool in safe memory limits. 254 return prevTimeDelta 255} 256 257func (c *Config) clampTimeDelta(t int64) int64 { 258 if t > c.MaxReadDuration.Milliseconds() { 259 log.Info("msg", "Exceeded 'max-read-duration', setting back to 'max-read-duration' value", "max-read-duration", c.MaxReadDuration.String()) 260 return c.MaxReadDuration.Milliseconds() 261 } 262 return t 263} 264