1// Copyright 2016 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package retrieval 15 16import ( 17 "fmt" 18 "io" 19 "net/http" 20 "sync" 21 "time" 22 23 "github.com/prometheus/client_golang/prometheus" 24 "github.com/prometheus/common/expfmt" 25 "github.com/prometheus/common/log" 26 "github.com/prometheus/common/model" 27 "github.com/prometheus/common/version" 28 "golang.org/x/net/context" 29 "golang.org/x/net/context/ctxhttp" 30 31 "github.com/prometheus/prometheus/config" 32 "github.com/prometheus/prometheus/storage" 33 "github.com/prometheus/prometheus/storage/local" 34 "github.com/prometheus/prometheus/util/httputil" 35) 36 37const ( 38 scrapeHealthMetricName = "up" 39 scrapeDurationMetricName = "scrape_duration_seconds" 40 scrapeSamplesMetricName = "scrape_samples_scraped" 41 samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" 42) 43 44var ( 45 targetIntervalLength = prometheus.NewSummaryVec( 46 prometheus.SummaryOpts{ 47 Name: "prometheus_target_interval_length_seconds", 48 Help: "Actual intervals between scrapes.", 49 Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, 50 }, 51 []string{"interval"}, 52 ) 53 targetSkippedScrapes = prometheus.NewCounter( 54 prometheus.CounterOpts{ 55 Name: "prometheus_target_skipped_scrapes_total", 56 Help: "Total number of scrapes that were skipped because the metric storage was throttled.", 57 }, 58 ) 59 targetReloadIntervalLength = prometheus.NewSummaryVec( 60 prometheus.SummaryOpts{ 61 Name: "prometheus_target_reload_length_seconds", 62 Help: "Actual interval to reload the scrape pool with a given configuration.", 63 Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, 64 }, 65 []string{"interval"}, 66 ) 67 targetSyncIntervalLength = prometheus.NewSummaryVec( 68 prometheus.SummaryOpts{ 69 Name: "prometheus_target_sync_length_seconds", 70 Help: "Actual interval to sync the scrape pool.", 71 Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, 72 }, 73 []string{"scrape_job"}, 74 ) 75 targetScrapePoolSyncsCounter = prometheus.NewCounterVec( 76 prometheus.CounterOpts{ 77 Name: "prometheus_target_scrape_pool_sync_total", 78 Help: "Total number of syncs that were executed on a scrape pool.", 79 }, 80 []string{"scrape_job"}, 81 ) 82 targetScrapeSampleLimit = prometheus.NewCounter( 83 prometheus.CounterOpts{ 84 Name: "prometheus_target_scrapes_exceeded_sample_limit_total", 85 Help: "Total number of scrapes that hit the sample limit and were rejected.", 86 }, 87 ) 88) 89 90func init() { 91 prometheus.MustRegister(targetIntervalLength) 92 prometheus.MustRegister(targetSkippedScrapes) 93 prometheus.MustRegister(targetReloadIntervalLength) 94 prometheus.MustRegister(targetSyncIntervalLength) 95 prometheus.MustRegister(targetScrapePoolSyncsCounter) 96 prometheus.MustRegister(targetScrapeSampleLimit) 97} 98 99// scrapePool manages scrapes for sets of targets. 100type scrapePool struct { 101 appender storage.SampleAppender 102 103 ctx context.Context 104 105 mtx sync.RWMutex 106 config *config.ScrapeConfig 107 client *http.Client 108 // Targets and loops must always be synchronized to have the same 109 // set of hashes. 110 targets map[uint64]*Target 111 loops map[uint64]loop 112 113 // Constructor for new scrape loops. This is settable for testing convenience. 114 newLoop func(context.Context, scraper, storage.SampleAppender, model.LabelSet, *config.ScrapeConfig) loop 115} 116 117func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app storage.SampleAppender) *scrapePool { 118 client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig) 119 if err != nil { 120 // Any errors that could occur here should be caught during config validation. 121 log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) 122 } 123 return &scrapePool{ 124 appender: app, 125 config: cfg, 126 ctx: ctx, 127 client: client, 128 targets: map[uint64]*Target{}, 129 loops: map[uint64]loop{}, 130 newLoop: newScrapeLoop, 131 } 132} 133 134// stop terminates all scrape loops and returns after they all terminated. 135func (sp *scrapePool) stop() { 136 var wg sync.WaitGroup 137 138 sp.mtx.Lock() 139 defer sp.mtx.Unlock() 140 141 for fp, l := range sp.loops { 142 wg.Add(1) 143 144 go func(l loop) { 145 l.stop() 146 wg.Done() 147 }(l) 148 149 delete(sp.loops, fp) 150 delete(sp.targets, fp) 151 } 152 153 wg.Wait() 154} 155 156// reload the scrape pool with the given scrape configuration. The target state is preserved 157// but all scrape loops are restarted with the new scrape configuration. 158// This method returns after all scrape loops that were stopped have fully terminated. 159func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { 160 start := time.Now() 161 162 sp.mtx.Lock() 163 defer sp.mtx.Unlock() 164 165 client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig) 166 if err != nil { 167 // Any errors that could occur here should be caught during config validation. 168 log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) 169 } 170 sp.config = cfg 171 sp.client = client 172 173 var ( 174 wg sync.WaitGroup 175 interval = time.Duration(sp.config.ScrapeInterval) 176 timeout = time.Duration(sp.config.ScrapeTimeout) 177 ) 178 179 for fp, oldLoop := range sp.loops { 180 var ( 181 t = sp.targets[fp] 182 s = &targetScraper{ 183 Target: t, 184 client: sp.client, 185 timeout: timeout, 186 } 187 newLoop = sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config) 188 ) 189 wg.Add(1) 190 191 go func(oldLoop, newLoop loop) { 192 oldLoop.stop() 193 wg.Done() 194 195 go newLoop.run(interval, timeout, nil) 196 }(oldLoop, newLoop) 197 198 sp.loops[fp] = newLoop 199 } 200 201 wg.Wait() 202 targetReloadIntervalLength.WithLabelValues(interval.String()).Observe( 203 time.Since(start).Seconds(), 204 ) 205} 206 207// Sync converts target groups into actual scrape targets and synchronizes 208// the currently running scraper with the resulting set. 209func (sp *scrapePool) Sync(tgs []*config.TargetGroup) { 210 start := time.Now() 211 212 var all []*Target 213 for _, tg := range tgs { 214 targets, err := targetsFromGroup(tg, sp.config) 215 if err != nil { 216 log.With("err", err).Error("creating targets failed") 217 continue 218 } 219 all = append(all, targets...) 220 } 221 sp.sync(all) 222 223 targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( 224 time.Since(start).Seconds(), 225 ) 226 targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc() 227} 228 229// sync takes a list of potentially duplicated targets, deduplicates them, starts 230// scrape loops for new targets, and stops scrape loops for disappeared targets. 231// It returns after all stopped scrape loops terminated. 232func (sp *scrapePool) sync(targets []*Target) { 233 sp.mtx.Lock() 234 defer sp.mtx.Unlock() 235 236 var ( 237 uniqueTargets = map[uint64]struct{}{} 238 interval = time.Duration(sp.config.ScrapeInterval) 239 timeout = time.Duration(sp.config.ScrapeTimeout) 240 ) 241 242 for _, t := range targets { 243 hash := t.hash() 244 uniqueTargets[hash] = struct{}{} 245 246 if _, ok := sp.targets[hash]; !ok { 247 s := &targetScraper{ 248 Target: t, 249 client: sp.client, 250 timeout: timeout, 251 } 252 253 l := sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config) 254 255 sp.targets[hash] = t 256 sp.loops[hash] = l 257 258 go l.run(interval, timeout, nil) 259 } 260 } 261 262 var wg sync.WaitGroup 263 264 // Stop and remove old targets and scraper loops. 265 for hash := range sp.targets { 266 if _, ok := uniqueTargets[hash]; !ok { 267 wg.Add(1) 268 go func(l loop) { 269 l.stop() 270 wg.Done() 271 }(sp.loops[hash]) 272 273 delete(sp.loops, hash) 274 delete(sp.targets, hash) 275 } 276 } 277 278 // Wait for all potentially stopped scrapers to terminate. 279 // This covers the case of flapping targets. If the server is under high load, a new scraper 280 // may be active and tries to insert. The old scraper that didn't terminate yet could still 281 // be inserting a previous sample set. 282 wg.Wait() 283} 284 285// A scraper retrieves samples and accepts a status report at the end. 286type scraper interface { 287 scrape(ctx context.Context, ts time.Time) (model.Samples, error) 288 report(start time.Time, dur time.Duration, err error) 289 offset(interval time.Duration) time.Duration 290} 291 292// targetScraper implements the scraper interface for a target. 293type targetScraper struct { 294 *Target 295 client *http.Client 296 timeout time.Duration 297} 298 299const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1` 300 301var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version) 302 303func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) { 304 req, err := http.NewRequest("GET", s.URL().String(), nil) 305 if err != nil { 306 return nil, err 307 } 308 req.Header.Add("Accept", acceptHeader) 309 req.Header.Set("User-Agent", userAgentHeader) 310 req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds())) 311 312 resp, err := ctxhttp.Do(ctx, s.client, req) 313 if err != nil { 314 return nil, err 315 } 316 defer resp.Body.Close() 317 318 if resp.StatusCode != http.StatusOK { 319 return nil, fmt.Errorf("server returned HTTP status %s", resp.Status) 320 } 321 322 var ( 323 allSamples = make(model.Samples, 0, 200) 324 decSamples = make(model.Vector, 0, 50) 325 ) 326 sdec := expfmt.SampleDecoder{ 327 Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)), 328 Opts: &expfmt.DecodeOptions{ 329 Timestamp: model.TimeFromUnixNano(ts.UnixNano()), 330 }, 331 } 332 333 for { 334 if err = sdec.Decode(&decSamples); err != nil { 335 break 336 } 337 allSamples = append(allSamples, decSamples...) 338 decSamples = decSamples[:0] 339 } 340 341 if err == io.EOF { 342 // Set err to nil since it is used in the scrape health recording. 343 err = nil 344 } 345 return allSamples, err 346} 347 348// A loop can run and be stopped again. It must not be reused after it was stopped. 349type loop interface { 350 run(interval, timeout time.Duration, errc chan<- error) 351 stop() 352} 353 354type scrapeLoop struct { 355 scraper scraper 356 357 // Where samples are ultimately sent. 358 appender storage.SampleAppender 359 360 targetLabels model.LabelSet 361 metricRelabelConfigs []*config.RelabelConfig 362 honorLabels bool 363 sampleLimit uint 364 365 done chan struct{} 366 ctx context.Context 367 cancel func() 368} 369 370func newScrapeLoop( 371 ctx context.Context, 372 sc scraper, 373 appender storage.SampleAppender, 374 targetLabels model.LabelSet, 375 config *config.ScrapeConfig, 376) loop { 377 sl := &scrapeLoop{ 378 scraper: sc, 379 appender: appender, 380 targetLabels: targetLabels, 381 metricRelabelConfigs: config.MetricRelabelConfigs, 382 honorLabels: config.HonorLabels, 383 sampleLimit: config.SampleLimit, 384 done: make(chan struct{}), 385 } 386 sl.ctx, sl.cancel = context.WithCancel(ctx) 387 388 return sl 389} 390 391func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { 392 defer close(sl.done) 393 394 select { 395 case <-time.After(sl.scraper.offset(interval)): 396 // Continue after a scraping offset. 397 case <-sl.ctx.Done(): 398 return 399 } 400 401 var last time.Time 402 403 ticker := time.NewTicker(interval) 404 defer ticker.Stop() 405 406 for { 407 select { 408 case <-sl.ctx.Done(): 409 return 410 default: 411 } 412 413 if !sl.appender.NeedsThrottling() { 414 var ( 415 start = time.Now() 416 scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout) 417 numPostRelabelSamples = 0 418 ) 419 420 // Only record after the first scrape. 421 if !last.IsZero() { 422 targetIntervalLength.WithLabelValues(interval.String()).Observe( 423 time.Since(last).Seconds(), 424 ) 425 } 426 427 samples, err := sl.scraper.scrape(scrapeCtx, start) 428 cancel() 429 if err == nil { 430 numPostRelabelSamples, err = sl.append(samples) 431 } 432 if err != nil && errc != nil { 433 errc <- err 434 } 435 sl.report(start, time.Since(start), len(samples), numPostRelabelSamples, err) 436 last = start 437 } else { 438 targetSkippedScrapes.Inc() 439 } 440 441 select { 442 case <-sl.ctx.Done(): 443 return 444 case <-ticker.C: 445 } 446 } 447} 448 449func (sl *scrapeLoop) stop() { 450 sl.cancel() 451 <-sl.done 452} 453 454// wrapAppender wraps a SampleAppender for relabeling. It returns the wrappend 455// appender and an innermost countingAppender that counts the samples actually 456// appended in the end. 457func (sl *scrapeLoop) wrapAppender(app storage.SampleAppender) (storage.SampleAppender, *countingAppender) { 458 // Innermost appender is a countingAppender to count how many samples 459 // are left in the end. 460 countingAppender := &countingAppender{ 461 SampleAppender: app, 462 } 463 app = countingAppender 464 465 // The relabelAppender has to be inside the label-modifying appenders so 466 // the relabeling rules are applied to the correct label set. 467 if len(sl.metricRelabelConfigs) > 0 { 468 app = relabelAppender{ 469 SampleAppender: app, 470 relabelings: sl.metricRelabelConfigs, 471 } 472 } 473 474 if sl.honorLabels { 475 app = honorLabelsAppender{ 476 SampleAppender: app, 477 labels: sl.targetLabels, 478 } 479 } else { 480 app = ruleLabelsAppender{ 481 SampleAppender: app, 482 labels: sl.targetLabels, 483 } 484 } 485 return app, countingAppender 486} 487 488func (sl *scrapeLoop) append(samples model.Samples) (int, error) { 489 var ( 490 numOutOfOrder = 0 491 numDuplicates = 0 492 app = sl.appender 493 countingApp *countingAppender 494 ) 495 496 if sl.sampleLimit > 0 { 497 // We need to check for the sample limit, so append everything 498 // to a wrapped bufferAppender first. Then point samples to the 499 // result. 500 bufApp := &bufferAppender{buffer: make(model.Samples, 0, len(samples))} 501 var wrappedBufApp storage.SampleAppender 502 wrappedBufApp, countingApp = sl.wrapAppender(bufApp) 503 for _, s := range samples { 504 // Ignore errors as bufferedAppender always succeeds. 505 wrappedBufApp.Append(s) 506 } 507 samples = bufApp.buffer 508 if uint(countingApp.count) > sl.sampleLimit { 509 targetScrapeSampleLimit.Inc() 510 return countingApp.count, fmt.Errorf( 511 "%d samples exceeded limit of %d", countingApp.count, sl.sampleLimit, 512 ) 513 } 514 } else { 515 // No need to check for sample limit. Wrap sl.appender directly. 516 app, countingApp = sl.wrapAppender(sl.appender) 517 } 518 519 for _, s := range samples { 520 if err := app.Append(s); err != nil { 521 switch err { 522 case local.ErrOutOfOrderSample: 523 numOutOfOrder++ 524 log.With("sample", s).With("error", err).Debug("Sample discarded") 525 case local.ErrDuplicateSampleForTimestamp: 526 numDuplicates++ 527 log.With("sample", s).With("error", err).Debug("Sample discarded") 528 default: 529 log.With("sample", s).With("error", err).Warn("Sample discarded") 530 } 531 } 532 } 533 if numOutOfOrder > 0 { 534 log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") 535 } 536 if numDuplicates > 0 { 537 log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") 538 } 539 return countingApp.count, nil 540} 541 542func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples, postRelabelSamples int, err error) { 543 sl.scraper.report(start, duration, err) 544 545 ts := model.TimeFromUnixNano(start.UnixNano()) 546 547 var health model.SampleValue 548 if err == nil { 549 health = 1 550 } 551 552 healthSample := &model.Sample{ 553 Metric: model.Metric{ 554 model.MetricNameLabel: scrapeHealthMetricName, 555 }, 556 Timestamp: ts, 557 Value: health, 558 } 559 durationSample := &model.Sample{ 560 Metric: model.Metric{ 561 model.MetricNameLabel: scrapeDurationMetricName, 562 }, 563 Timestamp: ts, 564 Value: model.SampleValue(duration.Seconds()), 565 } 566 countSample := &model.Sample{ 567 Metric: model.Metric{ 568 model.MetricNameLabel: scrapeSamplesMetricName, 569 }, 570 Timestamp: ts, 571 Value: model.SampleValue(scrapedSamples), 572 } 573 postRelabelSample := &model.Sample{ 574 Metric: model.Metric{ 575 model.MetricNameLabel: samplesPostRelabelMetricName, 576 }, 577 Timestamp: ts, 578 Value: model.SampleValue(postRelabelSamples), 579 } 580 581 reportAppender := ruleLabelsAppender{ 582 SampleAppender: sl.appender, 583 labels: sl.targetLabels, 584 } 585 586 if err := reportAppender.Append(healthSample); err != nil { 587 log.With("sample", healthSample).With("error", err).Warn("Scrape health sample discarded") 588 } 589 if err := reportAppender.Append(durationSample); err != nil { 590 log.With("sample", durationSample).With("error", err).Warn("Scrape duration sample discarded") 591 } 592 if err := reportAppender.Append(countSample); err != nil { 593 log.With("sample", durationSample).With("error", err).Warn("Scrape sample count sample discarded") 594 } 595 if err := reportAppender.Append(postRelabelSample); err != nil { 596 log.With("sample", durationSample).With("error", err).Warn("Scrape sample count post-relabeling sample discarded") 597 } 598} 599