1// Copyright 2014 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14// Package local contains the local time series storage used by Prometheus. 15package local 16 17import ( 18 "container/list" 19 "errors" 20 "fmt" 21 "math/rand" 22 "runtime" 23 "sort" 24 "sync" 25 "sync/atomic" 26 "time" 27 28 opentracing "github.com/opentracing/opentracing-go" 29 "github.com/prometheus/client_golang/prometheus" 30 "github.com/prometheus/common/log" 31 "github.com/prometheus/common/model" 32 "golang.org/x/net/context" 33 34 "github.com/prometheus/prometheus/storage/local/chunk" 35 "github.com/prometheus/prometheus/storage/metric" 36) 37 38const ( 39 evictRequestsCap = 1024 40 quarantineRequestsCap = 1024 41 42 // See waitForNextFP. 43 fpMaxSweepTime = 6 * time.Hour 44 fpMaxWaitDuration = 10 * time.Second 45 46 // See handleEvictList. This should be clearly shorter than the usual CG 47 // interval. On the other hand, each evict check calls ReadMemStats, 48 // which involves stopping the world (at least up to Go1.8). Hence, 49 // don't just set this to a very short interval. 50 evictInterval = time.Second 51 52 // Constants to control the hysteresis of entering and leaving "rushed 53 // mode". In rushed mode, the dirty series count is ignored for 54 // checkpointing, series are maintained as frequently as possible, and 55 // series files are not synced if the adaptive sync strategy is used. 56 persintenceUrgencyScoreForEnteringRushedMode = 0.8 57 persintenceUrgencyScoreForLeavingRushedMode = 0.7 58 59 // This factor times -storage.local.memory-chunks is the number of 60 // memory chunks we tolerate before throttling the storage. It is also a 61 // basis for calculating the persistenceUrgencyScore. 62 toleranceFactorMemChunks = 1.1 63 // This factor times -storage.local.max-chunks-to-persist is the minimum 64 // required number of chunks waiting for persistence before the number 65 // of chunks in memory may influence the persistenceUrgencyScore. (In 66 // other words: if there are no chunks to persist, it doesn't help chunk 67 // eviction if we speed up persistence.) 68 factorMinChunksToPersist = 0.2 69 70 // Threshold for when to stop using LabelMatchers to retrieve and 71 // intersect fingerprints. The rationale here is that looking up more 72 // fingerprints has diminishing returns if we already have narrowed down 73 // the possible fingerprints significantly. It is then easier to simply 74 // lookup the metrics for all the fingerprints and directly compare them 75 // to the matchers. Since a fingerprint lookup for an Equal matcher is 76 // much less expensive, there is a lower threshold for that case. 77 // TODO(beorn7): These numbers need to be tweaked, probably a bit lower. 78 // 5x higher numbers have resulted in slightly worse performance in a 79 // real-life production scenario. 80 fpEqualMatchThreshold = 1000 81 fpOtherMatchThreshold = 10000 82 83 selectorsTag = "selectors" 84 fromTag = "from" 85 throughTag = "through" 86 tsTag = "ts" 87 numSeries = "num_series" 88) 89 90type quarantineRequest struct { 91 fp model.Fingerprint 92 metric model.Metric 93 reason error 94} 95 96// SyncStrategy is an enum to select a sync strategy for series files. 97type SyncStrategy int 98 99// String implements flag.Value. 100func (ss SyncStrategy) String() string { 101 switch ss { 102 case Adaptive: 103 return "adaptive" 104 case Always: 105 return "always" 106 case Never: 107 return "never" 108 } 109 return "<unknown>" 110} 111 112// Set implements flag.Value. 113func (ss *SyncStrategy) Set(s string) error { 114 switch s { 115 case "adaptive": 116 *ss = Adaptive 117 case "always": 118 *ss = Always 119 case "never": 120 *ss = Never 121 default: 122 return fmt.Errorf("invalid sync strategy: %s", s) 123 } 124 return nil 125} 126 127// Possible values for SyncStrategy. 128const ( 129 _ SyncStrategy = iota 130 Never 131 Always 132 Adaptive 133) 134 135// A syncStrategy is a function that returns whether series files should be 136// synced or not. It does not need to be goroutine safe. 137type syncStrategy func() bool 138 139// A MemorySeriesStorage manages series in memory over time, while also 140// interfacing with a persistence layer to make time series data persistent 141// across restarts and evictable from memory. 142type MemorySeriesStorage struct { 143 // archiveHighWatermark, chunksToPersist, persistUrgency have to be aligned for atomic operations. 144 archiveHighWatermark model.Time // No archived series has samples after this time. 145 numChunksToPersist int64 // The number of chunks waiting for persistence. 146 persistUrgency int32 // Persistence urgency score * 1000, int32 allows atomic operations. 147 rushed bool // Whether the storage is in rushed mode. 148 rushedMtx sync.Mutex // Protects rushed. 149 lastNumGC uint32 // To detect if a GC cycle has run. 150 throttled chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging). 151 152 fpLocker *fingerprintLocker 153 fpToSeries *seriesMap 154 155 options *MemorySeriesStorageOptions 156 157 loopStopping, loopStopped chan struct{} 158 logThrottlingStopped chan struct{} 159 targetHeapSize uint64 160 dropAfter time.Duration 161 headChunkTimeout time.Duration 162 checkpointInterval time.Duration 163 checkpointDirtySeriesLimit int 164 165 persistence *persistence 166 mapper *fpMapper 167 168 evictList *list.List 169 evictRequests chan chunk.EvictRequest 170 evictStopping, evictStopped chan struct{} 171 172 quarantineRequests chan quarantineRequest 173 quarantineStopping, quarantineStopped chan struct{} 174 175 persistErrors prometheus.Counter 176 queuedChunksToPersist prometheus.Counter 177 chunksToPersist prometheus.GaugeFunc 178 memorySeries prometheus.Gauge 179 headChunks prometheus.Gauge 180 dirtySeries prometheus.Gauge 181 seriesOps *prometheus.CounterVec 182 ingestedSamples prometheus.Counter 183 discardedSamples *prometheus.CounterVec 184 nonExistentSeriesMatches prometheus.Counter 185 memChunks prometheus.GaugeFunc 186 maintainSeriesDuration *prometheus.SummaryVec 187 persistenceUrgencyScore prometheus.GaugeFunc 188 rushedMode prometheus.GaugeFunc 189 targetHeapSizeBytes prometheus.GaugeFunc 190} 191 192// MemorySeriesStorageOptions contains options needed by 193// NewMemorySeriesStorage. It is not safe to leave any of those at their zero 194// values. 195type MemorySeriesStorageOptions struct { 196 TargetHeapSize uint64 // Desired maximum heap size. 197 PersistenceStoragePath string // Location of persistence files. 198 PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped. 199 HeadChunkTimeout time.Duration // Head chunks idle for at least that long may be closed. 200 CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks. 201 CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint. 202 Dirty bool // Force the storage to consider itself dirty on startup. 203 PedanticChecks bool // If dirty, perform crash-recovery checks on each series file. 204 SyncStrategy SyncStrategy // Which sync strategy to apply to series files. 205 MinShrinkRatio float64 // Minimum ratio a series file has to shrink during truncation. 206 NumMutexes int // Number of mutexes used for stochastic fingerprint locking. 207} 208 209// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still 210// has to be called to start the storage. 211func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage { 212 s := &MemorySeriesStorage{ 213 fpLocker: newFingerprintLocker(o.NumMutexes), 214 215 options: o, 216 217 loopStopping: make(chan struct{}), 218 loopStopped: make(chan struct{}), 219 logThrottlingStopped: make(chan struct{}), 220 throttled: make(chan struct{}, 1), 221 targetHeapSize: o.TargetHeapSize, 222 dropAfter: o.PersistenceRetentionPeriod, 223 headChunkTimeout: o.HeadChunkTimeout, 224 checkpointInterval: o.CheckpointInterval, 225 checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, 226 archiveHighWatermark: model.Now().Add(-o.HeadChunkTimeout), 227 228 evictList: list.New(), 229 evictRequests: make(chan chunk.EvictRequest, evictRequestsCap), 230 evictStopping: make(chan struct{}), 231 evictStopped: make(chan struct{}), 232 233 quarantineRequests: make(chan quarantineRequest, quarantineRequestsCap), 234 quarantineStopping: make(chan struct{}), 235 quarantineStopped: make(chan struct{}), 236 237 persistErrors: prometheus.NewCounter(prometheus.CounterOpts{ 238 Namespace: namespace, 239 Subsystem: subsystem, 240 Name: "persist_errors_total", 241 Help: "The total number of errors while writing to the persistence layer.", 242 }), 243 queuedChunksToPersist: prometheus.NewCounter(prometheus.CounterOpts{ 244 Namespace: namespace, 245 Subsystem: subsystem, 246 Name: "queued_chunks_to_persist_total", 247 Help: "The total number of chunks queued for persistence.", 248 }), 249 memorySeries: prometheus.NewGauge(prometheus.GaugeOpts{ 250 Namespace: namespace, 251 Subsystem: subsystem, 252 Name: "memory_series", 253 Help: "The current number of series in memory.", 254 }), 255 headChunks: prometheus.NewGauge(prometheus.GaugeOpts{ 256 Namespace: namespace, 257 Subsystem: subsystem, 258 Name: "open_head_chunks", 259 Help: "The current number of open head chunks.", 260 }), 261 dirtySeries: prometheus.NewGauge(prometheus.GaugeOpts{ 262 Namespace: namespace, 263 Subsystem: subsystem, 264 Name: "memory_dirty_series", 265 Help: "The current number of series that would require a disk seek during crash recovery.", 266 }), 267 seriesOps: prometheus.NewCounterVec( 268 prometheus.CounterOpts{ 269 Namespace: namespace, 270 Subsystem: subsystem, 271 Name: "series_ops_total", 272 Help: "The total number of series operations by their type.", 273 }, 274 []string{opTypeLabel}, 275 ), 276 ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{ 277 Namespace: namespace, 278 Subsystem: subsystem, 279 Name: "ingested_samples_total", 280 Help: "The total number of samples ingested.", 281 }), 282 discardedSamples: prometheus.NewCounterVec( 283 prometheus.CounterOpts{ 284 Namespace: namespace, 285 Subsystem: subsystem, 286 Name: "out_of_order_samples_total", 287 Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.", 288 }, 289 []string{discardReasonLabel}, 290 ), 291 nonExistentSeriesMatches: prometheus.NewCounter(prometheus.CounterOpts{ 292 Namespace: namespace, 293 Subsystem: subsystem, 294 Name: "non_existent_series_matches_total", 295 Help: "How often a non-existent series was referred to during label matching or chunk preloading. This is an indication of outdated label indexes.", 296 }), 297 memChunks: prometheus.NewGaugeFunc( 298 prometheus.GaugeOpts{ 299 Namespace: namespace, 300 Subsystem: subsystem, 301 Name: "memory_chunks", 302 Help: "The current number of chunks in memory. The number does not include cloned chunks (i.e. chunks without a descriptor).", 303 }, 304 func() float64 { return float64(atomic.LoadInt64(&chunk.NumMemChunks)) }, 305 ), 306 maintainSeriesDuration: prometheus.NewSummaryVec( 307 prometheus.SummaryOpts{ 308 Namespace: namespace, 309 Subsystem: subsystem, 310 Name: "maintain_series_duration_seconds", 311 Help: "The duration in seconds it took to perform maintenance on a series.", 312 }, 313 []string{seriesLocationLabel}, 314 ), 315 } 316 317 s.chunksToPersist = prometheus.NewGaugeFunc( 318 prometheus.GaugeOpts{ 319 Namespace: namespace, 320 Subsystem: subsystem, 321 Name: "chunks_to_persist", 322 Help: "The current number of chunks waiting for persistence.", 323 }, 324 func() float64 { 325 return float64(s.getNumChunksToPersist()) 326 }, 327 ) 328 s.rushedMode = prometheus.NewGaugeFunc( 329 prometheus.GaugeOpts{ 330 Namespace: namespace, 331 Subsystem: subsystem, 332 Name: "rushed_mode", 333 Help: "1 if the storage is in rushed mode, 0 otherwise.", 334 }, 335 func() float64 { 336 s.rushedMtx.Lock() 337 defer s.rushedMtx.Unlock() 338 if s.rushed { 339 return 1 340 } 341 return 0 342 }, 343 ) 344 s.persistenceUrgencyScore = prometheus.NewGaugeFunc( 345 prometheus.GaugeOpts{ 346 Namespace: namespace, 347 Subsystem: subsystem, 348 Name: "persistence_urgency_score", 349 Help: "A score of urgency to persist chunks, 0 is least urgent, 1 most.", 350 }, 351 func() float64 { 352 score, _ := s.getPersistenceUrgencyScore() 353 return score 354 }, 355 ) 356 s.targetHeapSizeBytes = prometheus.NewGaugeFunc( 357 prometheus.GaugeOpts{ 358 Namespace: namespace, 359 Subsystem: subsystem, 360 Name: "target_heap_size_bytes", 361 Help: "The configured target heap size in bytes.", 362 }, 363 func() float64 { 364 return float64(s.targetHeapSize) 365 }, 366 ) 367 368 // Initialize metric vectors. 369 // TODO(beorn7): Rework once we have a utility function for it in client_golang. 370 s.discardedSamples.WithLabelValues(outOfOrderTimestamp) 371 s.discardedSamples.WithLabelValues(duplicateSample) 372 s.maintainSeriesDuration.WithLabelValues(maintainInMemory) 373 s.maintainSeriesDuration.WithLabelValues(maintainArchived) 374 s.seriesOps.WithLabelValues(create) 375 s.seriesOps.WithLabelValues(archive) 376 s.seriesOps.WithLabelValues(unarchive) 377 s.seriesOps.WithLabelValues(memoryPurge) 378 s.seriesOps.WithLabelValues(archivePurge) 379 s.seriesOps.WithLabelValues(requestedPurge) 380 s.seriesOps.WithLabelValues(memoryMaintenance) 381 s.seriesOps.WithLabelValues(archiveMaintenance) 382 s.seriesOps.WithLabelValues(completedQurantine) 383 s.seriesOps.WithLabelValues(droppedQuarantine) 384 s.seriesOps.WithLabelValues(failedQuarantine) 385 386 return s 387} 388 389// Start implements Storage. 390func (s *MemorySeriesStorage) Start() (err error) { 391 var syncStrategy syncStrategy 392 switch s.options.SyncStrategy { 393 case Never: 394 syncStrategy = func() bool { return false } 395 case Always: 396 syncStrategy = func() bool { return true } 397 case Adaptive: 398 syncStrategy = func() bool { 399 _, rushed := s.getPersistenceUrgencyScore() 400 return !rushed 401 } 402 default: 403 panic("unknown sync strategy") 404 } 405 406 var p *persistence 407 p, err = newPersistence( 408 s.options.PersistenceStoragePath, 409 s.options.Dirty, s.options.PedanticChecks, 410 syncStrategy, 411 s.options.MinShrinkRatio, 412 ) 413 if err != nil { 414 return err 415 } 416 s.persistence = p 417 // Persistence must start running before loadSeriesMapAndHeads() is called. 418 go s.persistence.run() 419 420 defer func() { 421 if err != nil { 422 if e := p.close(); e != nil { 423 log.Errorln("Error closing persistence:", e) 424 } 425 } 426 }() 427 428 log.Info("Loading series map and head chunks...") 429 s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads() 430 for _, series := range s.fpToSeries.m { 431 if !series.headChunkClosed { 432 s.headChunks.Inc() 433 } 434 } 435 436 if err != nil { 437 return err 438 } 439 log.Infof("%d series loaded.", s.fpToSeries.length()) 440 s.memorySeries.Set(float64(s.fpToSeries.length())) 441 442 s.mapper, err = newFPMapper(s.fpToSeries, p) 443 if err != nil { 444 return err 445 } 446 447 go s.handleEvictList() 448 go s.handleQuarantine() 449 go s.logThrottling() 450 go s.loop() 451 452 return nil 453} 454 455// Stop implements Storage. 456func (s *MemorySeriesStorage) Stop() error { 457 log.Info("Stopping local storage...") 458 459 log.Info("Stopping maintenance loop...") 460 close(s.loopStopping) 461 <-s.loopStopped 462 463 log.Info("Stopping series quarantining...") 464 close(s.quarantineStopping) 465 <-s.quarantineStopped 466 467 log.Info("Stopping chunk eviction...") 468 close(s.evictStopping) 469 <-s.evictStopped 470 471 // One final checkpoint of the series map and the head chunks. 472 if err := s.persistence.checkpointSeriesMapAndHeads( 473 context.Background(), s.fpToSeries, s.fpLocker, 474 ); err != nil { 475 return err 476 } 477 if err := s.mapper.checkpoint(); err != nil { 478 return err 479 } 480 481 if err := s.persistence.close(); err != nil { 482 return err 483 } 484 log.Info("Local storage stopped.") 485 return nil 486} 487 488type memorySeriesStorageQuerier struct { 489 *MemorySeriesStorage 490} 491 492func (memorySeriesStorageQuerier) Close() error { 493 return nil 494} 495 496// Querier implements the storage interface. 497func (s *MemorySeriesStorage) Querier() (Querier, error) { 498 return memorySeriesStorageQuerier{s}, nil 499} 500 501// WaitForIndexing implements Storage. 502func (s *MemorySeriesStorage) WaitForIndexing() { 503 s.persistence.waitForIndexing() 504} 505 506// LastSampleForLabelMatchers implements Storage. 507func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { 508 mergedFPs := map[model.Fingerprint]struct{}{} 509 for _, matchers := range matcherSets { 510 fps, err := s.fpsForLabelMatchers(cutoff, model.Latest, matchers...) 511 if err != nil { 512 return nil, err 513 } 514 for fp := range fps { 515 mergedFPs[fp] = struct{}{} 516 } 517 } 518 519 res := make(model.Vector, 0, len(mergedFPs)) 520 for fp := range mergedFPs { 521 s.fpLocker.Lock(fp) 522 523 series, ok := s.fpToSeries.get(fp) 524 if !ok { 525 // A series could have disappeared between resolving label matchers and here. 526 s.fpLocker.Unlock(fp) 527 continue 528 } 529 sp := series.lastSamplePair() 530 res = append(res, &model.Sample{ 531 Metric: series.metric, 532 Value: sp.Value, 533 Timestamp: sp.Timestamp, 534 }) 535 s.fpLocker.Unlock(fp) 536 } 537 return res, nil 538} 539 540// boundedIterator wraps a SeriesIterator and does not allow fetching 541// data from earlier than the configured start time. 542type boundedIterator struct { 543 it SeriesIterator 544 start model.Time 545} 546 547// ValueAtOrBeforeTime implements the SeriesIterator interface. 548func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair { 549 if ts < bit.start { 550 return model.ZeroSamplePair 551 } 552 return bit.it.ValueAtOrBeforeTime(ts) 553} 554 555// RangeValues implements the SeriesIterator interface. 556func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.SamplePair { 557 if interval.NewestInclusive < bit.start { 558 return []model.SamplePair{} 559 } 560 if interval.OldestInclusive < bit.start { 561 interval.OldestInclusive = bit.start 562 } 563 return bit.it.RangeValues(interval) 564} 565 566// Metric implements SeriesIterator. 567func (bit *boundedIterator) Metric() metric.Metric { 568 return bit.it.Metric() 569} 570 571// Close implements SeriesIterator. 572func (bit *boundedIterator) Close() { 573 bit.it.Close() 574} 575 576// QueryRange implements Storage. 577func (s *MemorySeriesStorage) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { 578 span, _ := opentracing.StartSpanFromContext(ctx, "QueryRange") 579 span.SetTag(selectorsTag, metric.LabelMatchers(matchers).String()) 580 span.SetTag(fromTag, int64(from)) 581 span.SetTag(throughTag, int64(through)) 582 defer span.Finish() 583 584 if through.Before(from) { 585 // In that case, nothing will match. 586 return nil, nil 587 } 588 fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...) 589 if err != nil { 590 return nil, err 591 } 592 span.SetTag(numSeries, len(fpSeriesPairs)) 593 iterators := make([]SeriesIterator, 0, len(fpSeriesPairs)) 594 for _, pair := range fpSeriesPairs { 595 it := s.preloadChunksForRange(pair, from, through) 596 iterators = append(iterators, it) 597 } 598 return iterators, nil 599} 600 601// QueryInstant implements Storage. 602func (s *MemorySeriesStorage) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { 603 span, _ := opentracing.StartSpanFromContext(ctx, "QueryInstant") 604 span.SetTag(selectorsTag, metric.LabelMatchers(matchers).String()) 605 span.SetTag(tsTag, ts) 606 defer span.Finish() 607 608 if stalenessDelta < 0 { 609 panic("negative staleness delta") 610 } 611 from := ts.Add(-stalenessDelta) 612 through := ts 613 614 fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...) 615 if err != nil { 616 return nil, err 617 } 618 iterators := make([]SeriesIterator, 0, len(fpSeriesPairs)) 619 for _, pair := range fpSeriesPairs { 620 it := s.preloadChunksForInstant(pair, from, through) 621 iterators = append(iterators, it) 622 } 623 return iterators, nil 624} 625 626// fingerprintsForLabelPair returns the fingerprints with the given 627// LabelPair. If intersectWith is non-nil, the method will only return 628// fingerprints that are also contained in intersectsWith. If mergeWith is 629// non-nil, the found fingerprints are added to the given map. The returned map 630// is the same as the given one. 631func (s *MemorySeriesStorage) fingerprintsForLabelPair( 632 pair model.LabelPair, 633 mergeWith map[model.Fingerprint]struct{}, 634 intersectWith map[model.Fingerprint]struct{}, 635) map[model.Fingerprint]struct{} { 636 if mergeWith == nil { 637 mergeWith = map[model.Fingerprint]struct{}{} 638 } 639 for _, fp := range s.persistence.fingerprintsForLabelPair(pair) { 640 if intersectWith == nil { 641 mergeWith[fp] = struct{}{} 642 continue 643 } 644 if _, ok := intersectWith[fp]; ok { 645 mergeWith[fp] = struct{}{} 646 } 647 } 648 return mergeWith 649} 650 651// MetricsForLabelMatchers implements Storage. 652func (s *MemorySeriesStorage) MetricsForLabelMatchers( 653 _ context.Context, 654 from, through model.Time, 655 matcherSets ...metric.LabelMatchers, 656) ([]metric.Metric, error) { 657 fpToMetric := map[model.Fingerprint]metric.Metric{} 658 for _, matchers := range matcherSets { 659 metrics, err := s.metricsForLabelMatchers(from, through, matchers...) 660 if err != nil { 661 return nil, err 662 } 663 for fp, m := range metrics { 664 fpToMetric[fp] = m 665 } 666 } 667 668 metrics := make([]metric.Metric, 0, len(fpToMetric)) 669 for _, m := range fpToMetric { 670 metrics = append(metrics, m) 671 } 672 return metrics, nil 673} 674 675// candidateFPsForLabelMatchers returns candidate FPs for given matchers and remaining matchers to be checked. 676func (s *MemorySeriesStorage) candidateFPsForLabelMatchers( 677 matchers ...*metric.LabelMatcher, 678) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) { 679 sort.Sort(metric.LabelMatchers(matchers)) 680 681 if len(matchers) == 0 || matchers[0].MatchesEmptyString() { 682 // No matchers at all or even the best matcher matches the empty string. 683 return nil, nil, nil 684 } 685 686 var ( 687 matcherIdx int 688 candidateFPs map[model.Fingerprint]struct{} 689 ) 690 691 // Equal matchers. 692 for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpEqualMatchThreshold); matcherIdx++ { 693 m := matchers[matcherIdx] 694 if m.Type != metric.Equal || m.MatchesEmptyString() { 695 break 696 } 697 candidateFPs = s.fingerprintsForLabelPair( 698 model.LabelPair{ 699 Name: m.Name, 700 Value: m.Value, 701 }, 702 nil, 703 candidateFPs, 704 ) 705 if len(candidateFPs) == 0 { 706 return nil, nil, nil 707 } 708 } 709 710 // Other matchers. 711 for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpOtherMatchThreshold); matcherIdx++ { 712 m := matchers[matcherIdx] 713 if m.MatchesEmptyString() { 714 break 715 } 716 717 lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name) 718 if err != nil { 719 return nil, nil, err 720 } 721 lvs = m.Filter(lvs) 722 if len(lvs) == 0 { 723 return nil, nil, nil 724 } 725 fps := map[model.Fingerprint]struct{}{} 726 for _, lv := range lvs { 727 s.fingerprintsForLabelPair( 728 model.LabelPair{ 729 Name: m.Name, 730 Value: lv, 731 }, 732 fps, 733 candidateFPs, 734 ) 735 } 736 candidateFPs = fps 737 if len(candidateFPs) == 0 { 738 return nil, nil, nil 739 } 740 } 741 return candidateFPs, matchers[matcherIdx:], nil 742} 743 744func (s *MemorySeriesStorage) seriesForLabelMatchers( 745 from, through model.Time, 746 matchers ...*metric.LabelMatcher, 747) ([]fingerprintSeriesPair, error) { 748 candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) 749 if err != nil { 750 return nil, err 751 } 752 753 result := []fingerprintSeriesPair{} 754FPLoop: 755 for fp := range candidateFPs { 756 s.fpLocker.Lock(fp) 757 series := s.seriesForRange(fp, from, through) 758 s.fpLocker.Unlock(fp) 759 760 if series == nil { 761 continue FPLoop 762 } 763 764 for _, m := range matchersToCheck { 765 if !m.Match(series.metric[m.Name]) { 766 continue FPLoop 767 } 768 } 769 result = append(result, fingerprintSeriesPair{fp, series}) 770 } 771 return result, nil 772} 773 774func (s *MemorySeriesStorage) fpsForLabelMatchers( 775 from, through model.Time, 776 matchers ...*metric.LabelMatcher, 777) (map[model.Fingerprint]struct{}, error) { 778 candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) 779 if err != nil { 780 return nil, err 781 } 782 783FPLoop: 784 for fp := range candidateFPs { 785 s.fpLocker.Lock(fp) 786 met, _, ok := s.metricForRange(fp, from, through) 787 s.fpLocker.Unlock(fp) 788 789 if !ok { 790 delete(candidateFPs, fp) 791 continue FPLoop 792 } 793 794 for _, m := range matchersToCheck { 795 if !m.Match(met[m.Name]) { 796 delete(candidateFPs, fp) 797 continue FPLoop 798 } 799 } 800 } 801 return candidateFPs, nil 802} 803 804func (s *MemorySeriesStorage) metricsForLabelMatchers( 805 from, through model.Time, 806 matchers ...*metric.LabelMatcher, 807) (map[model.Fingerprint]metric.Metric, error) { 808 809 candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...) 810 if err != nil { 811 return nil, err 812 } 813 814 result := map[model.Fingerprint]metric.Metric{} 815FPLoop: 816 for fp := range candidateFPs { 817 s.fpLocker.Lock(fp) 818 met, _, ok := s.metricForRange(fp, from, through) 819 s.fpLocker.Unlock(fp) 820 821 if !ok { 822 continue FPLoop 823 } 824 825 for _, m := range matchersToCheck { 826 if !m.Match(met[m.Name]) { 827 continue FPLoop 828 } 829 } 830 result[fp] = metric.Metric{Metric: met} 831 } 832 return result, nil 833} 834 835// metricForRange returns the metric for the given fingerprint if the 836// corresponding time series has samples between 'from' and 'through', together 837// with a pointer to the series if it is in memory already. For a series that 838// does not have samples between 'from' and 'through', the returned bool is 839// false. For an archived series that does contain samples between 'from' and 840// 'through', it returns (metric, nil, true). 841// 842// The caller must have locked the fp. 843func (s *MemorySeriesStorage) metricForRange( 844 fp model.Fingerprint, 845 from, through model.Time, 846) (model.Metric, *memorySeries, bool) { 847 series, ok := s.fpToSeries.get(fp) 848 if ok { 849 if series.lastTime.Before(from) || series.firstTime().After(through) { 850 return nil, nil, false 851 } 852 return series.metric, series, true 853 } 854 // From here on, we are only concerned with archived metrics. 855 // If the high watermark of archived series is before 'from', we are done. 856 watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark))) 857 if watermark < from { 858 return nil, nil, false 859 } 860 if from.After(model.Earliest) || through.Before(model.Latest) { 861 // The range lookup is relatively cheap, so let's do it first if 862 // we have a chance the archived metric is not in the range. 863 has, first, last := s.persistence.hasArchivedMetric(fp) 864 if !has { 865 s.nonExistentSeriesMatches.Inc() 866 return nil, nil, false 867 } 868 if first.After(through) || last.Before(from) { 869 return nil, nil, false 870 } 871 } 872 873 metric, err := s.persistence.archivedMetric(fp) 874 if err != nil { 875 // archivedMetric has already flagged the storage as dirty in this case. 876 return nil, nil, false 877 } 878 return metric, nil, true 879} 880 881// LabelValuesForLabelName implements Storage. 882func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelName model.LabelName) (model.LabelValues, error) { 883 return s.persistence.labelValuesForLabelName(labelName) 884} 885 886// DropMetricsForLabelMatchers implements Storage. 887func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) { 888 fps, err := s.fpsForLabelMatchers(model.Earliest, model.Latest, matchers...) 889 if err != nil { 890 return 0, err 891 } 892 for fp := range fps { 893 s.purgeSeries(fp, nil, nil) 894 } 895 return len(fps), nil 896} 897 898var ( 899 // ErrOutOfOrderSample is returned if a sample has a timestamp before the latest 900 // timestamp in the series it is appended to. 901 ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order") 902 // ErrDuplicateSampleForTimestamp is returned if a sample has the same 903 // timestamp as the latest sample in the series it is appended to but a 904 // different value. (Appending an identical sample is a no-op and does 905 // not cause an error.) 906 ErrDuplicateSampleForTimestamp = fmt.Errorf("sample with repeated timestamp but different value") 907) 908 909// Append implements Storage. 910func (s *MemorySeriesStorage) Append(sample *model.Sample) error { 911 for ln, lv := range sample.Metric { 912 if len(lv) == 0 { 913 delete(sample.Metric, ln) 914 } 915 } 916 rawFP := sample.Metric.FastFingerprint() 917 s.fpLocker.Lock(rawFP) 918 fp := s.mapper.mapFP(rawFP, sample.Metric) 919 defer func() { 920 s.fpLocker.Unlock(fp) 921 }() // Func wrapper because fp might change below. 922 if fp != rawFP { 923 // Switch locks. 924 s.fpLocker.Unlock(rawFP) 925 s.fpLocker.Lock(fp) 926 } 927 series, err := s.getOrCreateSeries(fp, sample.Metric) 928 if err != nil { 929 return err // getOrCreateSeries took care of quarantining already. 930 } 931 932 if sample.Timestamp == series.lastTime { 933 // Don't report "no-op appends", i.e. where timestamp and sample 934 // value are the same as for the last append, as they are a 935 // common occurrence when using client-side timestamps 936 // (e.g. Pushgateway or federation). 937 if sample.Timestamp == series.lastTime && 938 series.lastSampleValueSet && 939 sample.Value.Equal(series.lastSampleValue) { 940 return nil 941 } 942 s.discardedSamples.WithLabelValues(duplicateSample).Inc() 943 return ErrDuplicateSampleForTimestamp // Caused by the caller. 944 } 945 if sample.Timestamp < series.lastTime { 946 s.discardedSamples.WithLabelValues(outOfOrderTimestamp).Inc() 947 return ErrOutOfOrderSample // Caused by the caller. 948 } 949 headChunkWasClosed := series.headChunkClosed 950 completedChunksCount, err := series.add(model.SamplePair{ 951 Value: sample.Value, 952 Timestamp: sample.Timestamp, 953 }) 954 if err != nil { 955 s.quarantineSeries(fp, sample.Metric, err) 956 return err 957 } 958 if headChunkWasClosed { 959 // Appending to a series with a closed head chunk creates an 960 // additional open head chunk. 961 s.headChunks.Inc() 962 } 963 s.ingestedSamples.Inc() 964 s.incNumChunksToPersist(completedChunksCount) 965 966 return nil 967} 968 969// NeedsThrottling implements Storage. 970func (s *MemorySeriesStorage) NeedsThrottling() bool { 971 if score, _ := s.getPersistenceUrgencyScore(); score >= 1 { 972 select { 973 case s.throttled <- struct{}{}: 974 default: // Do nothing, signal already pending. 975 } 976 return true 977 } 978 return false 979} 980 981// logThrottling handles logging of throttled events and has to be started as a 982// goroutine. It stops once s.loopStopping is closed. 983// 984// Logging strategy: Whenever Throttle() is called and returns true, an signal 985// is sent to s.throttled. If that happens for the first time, an Error is 986// logged that the storage is now throttled. As long as signals continues to be 987// sent via s.throttled at least once per minute, nothing else is logged. Once 988// no signal has arrived for a minute, an Info is logged that the storage is not 989// throttled anymore. This resets things to the initial state, i.e. once a 990// signal arrives again, the Error will be logged again. 991func (s *MemorySeriesStorage) logThrottling() { 992 timer := time.NewTimer(time.Minute) 993 timer.Stop() 994 995 // Signal exit of the goroutine. Currently only needed by test code. 996 defer close(s.logThrottlingStopped) 997 998 for { 999 select { 1000 case <-s.throttled: 1001 if !timer.Stop() { 1002 select { 1003 case <-timer.C: 1004 default: 1005 } 1006 score, _ := s.getPersistenceUrgencyScore() 1007 log. 1008 With("urgencyScore", score). 1009 With("chunksToPersist", s.getNumChunksToPersist()). 1010 With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)). 1011 Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.") 1012 } 1013 timer.Reset(time.Minute) 1014 case <-timer.C: 1015 score, _ := s.getPersistenceUrgencyScore() 1016 log. 1017 With("urgencyScore", score). 1018 With("chunksToPersist", s.getNumChunksToPersist()). 1019 With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)). 1020 Info("Storage does not need throttling anymore.") 1021 case <-s.loopStopping: 1022 return 1023 } 1024 } 1025} 1026 1027func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) { 1028 series, ok := s.fpToSeries.get(fp) 1029 if !ok { 1030 var cds []*chunk.Desc 1031 var modTime time.Time 1032 unarchived, err := s.persistence.unarchiveMetric(fp) 1033 if err != nil { 1034 log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err) 1035 return nil, err 1036 } 1037 if unarchived { 1038 s.seriesOps.WithLabelValues(unarchive).Inc() 1039 // We have to load chunk.Descs anyway to do anything with 1040 // the series, so let's do it right now so that we don't 1041 // end up with a series without any chunk.Descs for a 1042 // while (which is confusing as it makes the series 1043 // appear as archived or purged). 1044 cds, err = s.loadChunkDescs(fp, 0) 1045 if err == nil && len(cds) == 0 { 1046 err = fmt.Errorf("unarchived fingerprint %v (metric %v) has no chunks on disk", fp, m) 1047 } 1048 if err != nil { 1049 s.quarantineSeries(fp, m, err) 1050 return nil, err 1051 } 1052 modTime = s.persistence.seriesFileModTime(fp) 1053 } else { 1054 // This was a genuinely new series, so index the metric. 1055 s.persistence.indexMetric(fp, m) 1056 s.seriesOps.WithLabelValues(create).Inc() 1057 } 1058 series, err = newMemorySeries(m, cds, modTime) 1059 if err != nil { 1060 s.quarantineSeries(fp, m, err) 1061 return nil, err 1062 } 1063 s.fpToSeries.put(fp, series) 1064 s.memorySeries.Inc() 1065 if !series.headChunkClosed { 1066 s.headChunks.Inc() 1067 } 1068 } 1069 return series, nil 1070} 1071 1072// seriesForRange is a helper method for seriesForLabelMatchers. 1073// 1074// The caller must have locked the fp. 1075func (s *MemorySeriesStorage) seriesForRange( 1076 fp model.Fingerprint, 1077 from model.Time, through model.Time, 1078) *memorySeries { 1079 metric, series, ok := s.metricForRange(fp, from, through) 1080 if !ok { 1081 return nil 1082 } 1083 if series == nil { 1084 series, _ = s.getOrCreateSeries(fp, metric) 1085 // getOrCreateSeries took care of quarantining already, so ignore the error. 1086 } 1087 return series 1088} 1089 1090func (s *MemorySeriesStorage) preloadChunksForRange( 1091 pair fingerprintSeriesPair, 1092 from model.Time, through model.Time, 1093) SeriesIterator { 1094 fp, series := pair.fp, pair.series 1095 if series == nil { 1096 return nopIter 1097 } 1098 1099 s.fpLocker.Lock(fp) 1100 defer s.fpLocker.Unlock(fp) 1101 1102 iter, err := series.preloadChunksForRange(fp, from, through, s) 1103 if err != nil { 1104 s.quarantineSeries(fp, series.metric, err) 1105 return nopIter 1106 } 1107 return iter 1108} 1109 1110func (s *MemorySeriesStorage) preloadChunksForInstant( 1111 pair fingerprintSeriesPair, 1112 from model.Time, through model.Time, 1113) SeriesIterator { 1114 fp, series := pair.fp, pair.series 1115 if series == nil { 1116 return nopIter 1117 } 1118 1119 s.fpLocker.Lock(fp) 1120 defer s.fpLocker.Unlock(fp) 1121 1122 iter, err := series.preloadChunksForInstant(fp, from, through, s) 1123 if err != nil { 1124 s.quarantineSeries(fp, series.metric, err) 1125 return nopIter 1126 } 1127 return iter 1128} 1129 1130func (s *MemorySeriesStorage) handleEvictList() { 1131 // This ticker is supposed to tick at least once per GC cyle. Ideally, 1132 // we would handle the evict list after each finished GC cycle, but I 1133 // don't know of a way to "subscribe" to that kind of event. 1134 ticker := time.NewTicker(evictInterval) 1135 1136 for { 1137 select { 1138 case req := <-s.evictRequests: 1139 if req.Evict { 1140 req.Desc.EvictListElement = s.evictList.PushBack(req.Desc) 1141 } else { 1142 if req.Desc.EvictListElement != nil { 1143 s.evictList.Remove(req.Desc.EvictListElement) 1144 req.Desc.EvictListElement = nil 1145 } 1146 } 1147 case <-ticker.C: 1148 s.maybeEvict() 1149 case <-s.evictStopping: 1150 // Drain evictRequests forever in a goroutine to not let 1151 // requesters hang. 1152 go func() { 1153 for { 1154 <-s.evictRequests 1155 } 1156 }() 1157 ticker.Stop() 1158 log.Info("Chunk eviction stopped.") 1159 close(s.evictStopped) 1160 return 1161 } 1162 } 1163} 1164 1165// maybeEvict is a local helper method. Must only be called by handleEvictList. 1166func (s *MemorySeriesStorage) maybeEvict() { 1167 ms := runtime.MemStats{} 1168 runtime.ReadMemStats(&ms) 1169 numChunksToEvict := s.calculatePersistUrgency(&ms) 1170 1171 if numChunksToEvict <= 0 { 1172 return 1173 } 1174 1175 chunkDescsToEvict := make([]*chunk.Desc, numChunksToEvict) 1176 for i := range chunkDescsToEvict { 1177 e := s.evictList.Front() 1178 if e == nil { 1179 break 1180 } 1181 cd := e.Value.(*chunk.Desc) 1182 cd.EvictListElement = nil 1183 chunkDescsToEvict[i] = cd 1184 s.evictList.Remove(e) 1185 } 1186 // Do the actual eviction in a goroutine as we might otherwise deadlock, 1187 // in the following way: A chunk was Unpinned completely and therefore 1188 // scheduled for eviction. At the time we actually try to evict it, 1189 // another goroutine is pinning the chunk. The pinning goroutine has 1190 // currently locked the chunk and tries to send the evict request (to 1191 // remove the chunk from the evict list) to the evictRequests 1192 // channel. The send blocks because evictRequests is full. However, the 1193 // goroutine that is supposed to empty the channel is waiting for the 1194 // Chunk.Desc lock to try to evict the chunk. 1195 go func() { 1196 for _, cd := range chunkDescsToEvict { 1197 if cd == nil { 1198 break 1199 } 1200 cd.MaybeEvict() 1201 // We don't care if the eviction succeeds. If the chunk 1202 // was pinned in the meantime, it will be added to the 1203 // evict list once it gets Unpinned again. 1204 } 1205 }() 1206} 1207 1208// calculatePersistUrgency calculates and sets s.persistUrgency. Based on the 1209// calculation, it returns the number of chunks to evict. The runtime.MemStats 1210// are passed in here for testability. 1211// 1212// The persist urgency is calculated by the following formula: 1213// 1214// n(toPersist) MAX( h(nextGC), h(current) ) 1215// p = MIN( 1, --------------------------- * ---------------------------- ) 1216// n(toPersist) + n(evictable) h(target) 1217// 1218// where: 1219// 1220// n(toPersist): Number of chunks waiting for persistence. 1221// n(evictable): Number of evictable chunks. 1222// h(nextGC): Heap size at which the next GC will kick in (ms.NextGC). 1223// h(current): Current heap size (ms.HeapAlloc). 1224// h(target): Configured target heap size. 1225// 1226// Note that the actual value stored in s.persistUrgency is 1000 times the value 1227// calculated as above to allow using an int32, which supports atomic 1228// operations. 1229// 1230// If no GC has run after the last call of this method, it will always return 0 1231// (no reason to try to evict any more chunks before we have seen the effect of 1232// the previous eviction). It will also not decrease the persist urgency in this 1233// case (but it will increase the persist urgency if a higher value was calculated). 1234// 1235// If a GC has run after the last call of this method, the following cases apply: 1236// 1237// - If MAX( h(nextGC), h(current) ) < h(target), simply return 0. Nothing to 1238// evict if the heap is still small enough. 1239// 1240// - Otherwise, if n(evictable) is 0, also return 0, but set the urgency score 1241// to 1 to signal that we want to evict chunk but have no evictable chunks 1242// available. 1243// 1244// - Otherwise, calculate the number of chunks to evict and return it: 1245// 1246// MAX( h(nextGC), h(current) ) - h(target) 1247// n(toEvict) = MIN( n(evictable), ---------------------------------------- ) 1248// c 1249// 1250// where c is the size of a chunk. 1251// 1252// - In the latter case, the persist urgency might be increased. The final value 1253// is the following: 1254// 1255// n(toEvict) 1256// MAX( p, ------------ ) 1257// n(evictable) 1258// 1259// Broadly speaking, the persist urgency is based on the ratio of the number of 1260// chunks we want to evict and the number of chunks that are actually 1261// evictable. However, in particular for the case where we don't need to evict 1262// chunks yet, it also takes into account how close the heap has already grown 1263// to the configured target size, and how big the pool of chunks to persist is 1264// compared to the number of chunks already evictable. 1265// 1266// This is a helper method only to be called by MemorySeriesStorage.maybeEvict. 1267func (s *MemorySeriesStorage) calculatePersistUrgency(ms *runtime.MemStats) int { 1268 var ( 1269 oldUrgency = atomic.LoadInt32(&s.persistUrgency) 1270 newUrgency int32 1271 numChunksToPersist = s.getNumChunksToPersist() 1272 ) 1273 defer func() { 1274 if newUrgency > 1000 { 1275 newUrgency = 1000 1276 } 1277 atomic.StoreInt32(&s.persistUrgency, newUrgency) 1278 }() 1279 1280 // Take the NextGC as the relevant heap size because the heap will grow 1281 // to that size before GC kicks in. However, at times the current heap 1282 // is already larger than NextGC, in which case we take that worse case. 1283 heapSize := ms.NextGC 1284 if ms.HeapAlloc > ms.NextGC { 1285 heapSize = ms.HeapAlloc 1286 } 1287 1288 if numChunksToPersist > 0 { 1289 newUrgency = int32(1000 * uint64(numChunksToPersist) / uint64(numChunksToPersist+s.evictList.Len()) * heapSize / s.targetHeapSize) 1290 } 1291 1292 // Only continue if a GC has happened since we were here last time. 1293 if ms.NumGC == s.lastNumGC { 1294 if oldUrgency > newUrgency { 1295 // Never reduce urgency without a GC run. 1296 newUrgency = oldUrgency 1297 } 1298 return 0 1299 } 1300 s.lastNumGC = ms.NumGC 1301 1302 if heapSize <= s.targetHeapSize { 1303 return 0 // Heap still small enough, don't evict. 1304 } 1305 if s.evictList.Len() == 0 { 1306 // We want to reduce heap size but there is nothing to evict. 1307 newUrgency = 1000 1308 return 0 1309 } 1310 numChunksToEvict := int((heapSize - s.targetHeapSize) / chunk.ChunkLen) 1311 if numChunksToEvict > s.evictList.Len() { 1312 numChunksToEvict = s.evictList.Len() 1313 } 1314 if u := int32(numChunksToEvict * 1000 / s.evictList.Len()); u > newUrgency { 1315 newUrgency = u 1316 } 1317 return numChunksToEvict 1318} 1319 1320// waitForNextFP waits an estimated duration, after which we want to process 1321// another fingerprint so that we will process all fingerprints in a tenth of 1322// s.dropAfter assuming that the system is doing nothing else, e.g. if we want 1323// to drop chunks after 40h, we want to cycle through all fingerprints within 1324// 4h. The estimation is based on the total number of fingerprints as passed 1325// in. However, the maximum sweep time is capped at fpMaxSweepTime. Also, the 1326// method will never wait for longer than fpMaxWaitDuration. 1327// 1328// The maxWaitDurationFactor can be used to reduce the waiting time if a faster 1329// processing is required (for example because unpersisted chunks pile up too 1330// much). 1331// 1332// Normally, the method returns true once the wait duration has passed. However, 1333// if s.loopStopped is closed, it will return false immediately. 1334func (s *MemorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFactor float64) bool { 1335 d := fpMaxWaitDuration 1336 if numberOfFPs != 0 { 1337 sweepTime := s.dropAfter / 10 1338 if sweepTime > fpMaxSweepTime { 1339 sweepTime = fpMaxSweepTime 1340 } 1341 calculatedWait := time.Duration(float64(sweepTime) / float64(numberOfFPs) * maxWaitDurationFactor) 1342 if calculatedWait < d { 1343 d = calculatedWait 1344 } 1345 } 1346 if d == 0 { 1347 return true 1348 } 1349 t := time.NewTimer(d) 1350 select { 1351 case <-t.C: 1352 return true 1353 case <-s.loopStopping: 1354 return false 1355 } 1356} 1357 1358// cycleThroughMemoryFingerprints returns a channel that emits fingerprints for 1359// series in memory in a throttled fashion. It continues to cycle through all 1360// fingerprints in memory until s.loopStopping is closed. 1361func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Fingerprint { 1362 memoryFingerprints := make(chan model.Fingerprint) 1363 go func() { 1364 defer close(memoryFingerprints) 1365 firstPass := true 1366 1367 for { 1368 // Initial wait, also important if there are no FPs yet. 1369 if !s.waitForNextFP(s.fpToSeries.length(), 1) { 1370 return 1371 } 1372 begin := time.Now() 1373 fps := s.fpToSeries.sortedFPs() 1374 if firstPass && len(fps) > 0 { 1375 // Start first pass at a random location in the 1376 // key space to cover the whole key space even 1377 // in the case of frequent restarts. 1378 fps = fps[rand.Intn(len(fps)):] 1379 } 1380 count := 0 1381 for _, fp := range fps { 1382 select { 1383 case memoryFingerprints <- fp: 1384 case <-s.loopStopping: 1385 return 1386 } 1387 // Reduce the wait time according to the urgency score. 1388 score, rushed := s.getPersistenceUrgencyScore() 1389 if rushed { 1390 score = 1 1391 } 1392 s.waitForNextFP(s.fpToSeries.length(), 1-score) 1393 count++ 1394 } 1395 if count > 0 { 1396 msg := "full" 1397 if firstPass { 1398 msg = "initial partial" 1399 } 1400 log.Infof( 1401 "Completed %s maintenance sweep through %d in-memory fingerprints in %v.", 1402 msg, count, time.Since(begin), 1403 ) 1404 } 1405 firstPass = false 1406 } 1407 }() 1408 1409 return memoryFingerprints 1410} 1411 1412// cycleThroughArchivedFingerprints returns a channel that emits fingerprints 1413// for archived series in a throttled fashion. It continues to cycle through all 1414// archived fingerprints until s.loopStopping is closed. 1415func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fingerprint { 1416 archivedFingerprints := make(chan model.Fingerprint) 1417 go func() { 1418 defer close(archivedFingerprints) 1419 1420 for { 1421 archivedFPs, err := s.persistence.fingerprintsModifiedBefore( 1422 model.Now().Add(-s.dropAfter), 1423 ) 1424 if err != nil { 1425 log.Error("Failed to lookup archived fingerprint ranges: ", err) 1426 s.waitForNextFP(0, 1) 1427 continue 1428 } 1429 // Initial wait, also important if there are no FPs yet. 1430 if !s.waitForNextFP(len(archivedFPs), 1) { 1431 return 1432 } 1433 begin := time.Now() 1434 for _, fp := range archivedFPs { 1435 select { 1436 case archivedFingerprints <- fp: 1437 case <-s.loopStopping: 1438 return 1439 } 1440 // Never speed up maintenance of archived FPs. 1441 s.waitForNextFP(len(archivedFPs), 1) 1442 } 1443 if len(archivedFPs) > 0 { 1444 log.Infof( 1445 "Completed maintenance sweep through %d archived fingerprints in %v.", 1446 len(archivedFPs), time.Since(begin), 1447 ) 1448 } 1449 } 1450 }() 1451 return archivedFingerprints 1452} 1453 1454func (s *MemorySeriesStorage) loop() { 1455 checkpointTimer := time.NewTimer(s.checkpointInterval) 1456 checkpointMinTimer := time.NewTimer(0) 1457 1458 var dirtySeriesCount int64 1459 1460 defer func() { 1461 checkpointTimer.Stop() 1462 checkpointMinTimer.Stop() 1463 log.Info("Maintenance loop stopped.") 1464 close(s.loopStopped) 1465 }() 1466 1467 memoryFingerprints := s.cycleThroughMemoryFingerprints() 1468 archivedFingerprints := s.cycleThroughArchivedFingerprints() 1469 1470 checkpointCtx, checkpointCancel := context.WithCancel(context.Background()) 1471 checkpointNow := make(chan struct{}, 1) 1472 1473 doCheckpoint := func() time.Duration { 1474 start := time.Now() 1475 // We clear this before the checkpoint so that dirtySeriesCount 1476 // is an upper bound. 1477 atomic.StoreInt64(&dirtySeriesCount, 0) 1478 s.dirtySeries.Set(0) 1479 select { 1480 case <-checkpointNow: 1481 // Signal cleared. 1482 default: 1483 // No signal pending. 1484 } 1485 err := s.persistence.checkpointSeriesMapAndHeads( 1486 checkpointCtx, s.fpToSeries, s.fpLocker, 1487 ) 1488 if err == context.Canceled { 1489 log.Info("Checkpoint canceled.") 1490 } else if err != nil { 1491 s.persistErrors.Inc() 1492 log.Errorln("Error while checkpointing:", err) 1493 } 1494 return time.Since(start) 1495 } 1496 1497 // Checkpoints can happen concurrently with maintenance so even with heavy 1498 // checkpointing there will still be sufficient progress on maintenance. 1499 checkpointLoopStopped := make(chan struct{}) 1500 go func() { 1501 for { 1502 select { 1503 case <-checkpointCtx.Done(): 1504 checkpointLoopStopped <- struct{}{} 1505 return 1506 case <-checkpointMinTimer.C: 1507 var took time.Duration 1508 select { 1509 case <-checkpointCtx.Done(): 1510 checkpointLoopStopped <- struct{}{} 1511 return 1512 case <-checkpointTimer.C: 1513 took = doCheckpoint() 1514 case <-checkpointNow: 1515 if !checkpointTimer.Stop() { 1516 <-checkpointTimer.C 1517 } 1518 took = doCheckpoint() 1519 } 1520 checkpointMinTimer.Reset(took) 1521 checkpointTimer.Reset(s.checkpointInterval) 1522 } 1523 } 1524 }() 1525 1526loop: 1527 for { 1528 select { 1529 case <-s.loopStopping: 1530 checkpointCancel() 1531 break loop 1532 case fp := <-memoryFingerprints: 1533 if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) { 1534 dirty := atomic.AddInt64(&dirtySeriesCount, 1) 1535 s.dirtySeries.Set(float64(dirty)) 1536 // Check if we have enough "dirty" series so that we need an early checkpoint. 1537 // However, if we are already behind persisting chunks, creating a checkpoint 1538 // would be counterproductive, as it would slow down chunk persisting even more, 1539 // while in a situation like that, where we are clearly lacking speed of disk 1540 // maintenance, the best we can do for crash recovery is to persist chunks as 1541 // quickly as possible. So only checkpoint if we are not in rushed mode. 1542 if _, rushed := s.getPersistenceUrgencyScore(); !rushed && 1543 dirty >= int64(s.checkpointDirtySeriesLimit) { 1544 select { 1545 case checkpointNow <- struct{}{}: 1546 // Signal sent. 1547 default: 1548 // Signal already pending. 1549 } 1550 } 1551 } 1552 case fp := <-archivedFingerprints: 1553 s.maintainArchivedSeries(fp, model.Now().Add(-s.dropAfter)) 1554 } 1555 } 1556 // Wait until both channels are closed. 1557 for range memoryFingerprints { 1558 } 1559 for range archivedFingerprints { 1560 } 1561 <-checkpointLoopStopped 1562} 1563 1564// maintainMemorySeries maintains a series that is in memory (i.e. not 1565// archived). It returns true if the method has changed from clean to dirty 1566// (i.e. it is inconsistent with the latest checkpoint now so that in case of a 1567// crash a recovery operation that requires a disk seek needed to be applied). 1568// 1569// The method first closes the head chunk if it was not touched for the duration 1570// of headChunkTimeout. 1571// 1572// Then it determines the chunks that need to be purged and the chunks that need 1573// to be persisted. Depending on the result, it does the following: 1574// 1575// - If all chunks of a series need to be purged, the whole series is deleted 1576// for good and the method returns false. (Detecting non-existence of a series 1577// file does not require a disk seek.) 1578// 1579// - If any chunks need to be purged (but not all of them), it purges those 1580// chunks from memory and rewrites the series file on disk, leaving out the 1581// purged chunks and appending all chunks not yet persisted (with the exception 1582// of a still open head chunk). 1583// 1584// - If no chunks on disk need to be purged, but chunks need to be persisted, 1585// those chunks are simply appended to the existing series file (or the file is 1586// created if it does not exist yet). 1587// 1588// - If no chunks need to be purged and no chunks need to be persisted, nothing 1589// happens in this step. 1590// 1591// Next, the method checks if all chunks in the series are evicted. In that 1592// case, it archives the series and returns true. 1593// 1594// Finally, it evicts chunk.Descs if there are too many. 1595func (s *MemorySeriesStorage) maintainMemorySeries( 1596 fp model.Fingerprint, beforeTime model.Time, 1597) (becameDirty bool) { 1598 defer func(begin time.Time) { 1599 s.maintainSeriesDuration.WithLabelValues(maintainInMemory).Observe( 1600 time.Since(begin).Seconds(), 1601 ) 1602 }(time.Now()) 1603 1604 s.fpLocker.Lock(fp) 1605 defer s.fpLocker.Unlock(fp) 1606 1607 series, ok := s.fpToSeries.get(fp) 1608 if !ok { 1609 // Series is actually not in memory, perhaps archived or dropped in the meantime. 1610 return false 1611 } 1612 1613 defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc() 1614 1615 closed, err := series.maybeCloseHeadChunk(s.headChunkTimeout) 1616 if err != nil { 1617 s.quarantineSeries(fp, series.metric, err) 1618 s.persistErrors.Inc() 1619 } 1620 if closed { 1621 s.incNumChunksToPersist(1) 1622 s.headChunks.Dec() 1623 } 1624 1625 seriesWasDirty := series.dirty 1626 1627 if s.writeMemorySeries(fp, series, beforeTime) { 1628 // Series is gone now, we are done. 1629 return false 1630 } 1631 1632 iOldestNotEvicted := -1 1633 for i, cd := range series.chunkDescs { 1634 if !cd.IsEvicted() { 1635 iOldestNotEvicted = i 1636 break 1637 } 1638 } 1639 1640 // Archive if all chunks are evicted. Also make sure the last sample has 1641 // an age of at least headChunkTimeout (which is very likely anyway). 1642 if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > s.headChunkTimeout { 1643 s.fpToSeries.del(fp) 1644 s.memorySeries.Dec() 1645 s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) 1646 s.seriesOps.WithLabelValues(archive).Inc() 1647 oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) 1648 if oldWatermark < int64(series.lastTime) { 1649 if !atomic.CompareAndSwapInt64( 1650 (*int64)(&s.archiveHighWatermark), 1651 oldWatermark, int64(series.lastTime), 1652 ) { 1653 panic("s.archiveHighWatermark modified outside of maintainMemorySeries") 1654 } 1655 } 1656 return 1657 } 1658 // If we are here, the series is not archived, so check for chunk.Desc 1659 // eviction next. 1660 series.evictChunkDescs(iOldestNotEvicted) 1661 1662 return series.dirty && !seriesWasDirty 1663} 1664 1665// writeMemorySeries (re-)writes a memory series file. While doing so, it drops 1666// chunks older than beforeTime from both the series file (if it exists) as well 1667// as from memory. The provided chunksToPersist are appended to the newly 1668// written series file. If no chunks need to be purged, but chunksToPersist is 1669// not empty, those chunks are simply appended to the series file. If the series 1670// contains no chunks after dropping old chunks, it is purged entirely. In that 1671// case, the method returns true. 1672// 1673// If a persist error is encountered, the series is queued for quarantine. In 1674// that case, the method returns true, too, because the series should not be 1675// processed anymore (even if it will only be gone for real once quarantining 1676// has been completed). 1677// 1678// The caller must have locked the fp. 1679func (s *MemorySeriesStorage) writeMemorySeries( 1680 fp model.Fingerprint, series *memorySeries, beforeTime model.Time, 1681) bool { 1682 var ( 1683 persistErr error 1684 cds = series.chunksToPersist() 1685 ) 1686 1687 defer func() { 1688 if persistErr != nil { 1689 s.quarantineSeries(fp, series.metric, persistErr) 1690 s.persistErrors.Inc() 1691 } 1692 // The following is done even in case of an error to ensure 1693 // correct counter bookkeeping and to not pin chunks in memory 1694 // that belong to a series that is scheduled for quarantine 1695 // anyway. 1696 for _, cd := range cds { 1697 cd.Unpin(s.evictRequests) 1698 } 1699 s.incNumChunksToPersist(-len(cds)) 1700 chunk.Ops.WithLabelValues(chunk.PersistAndUnpin).Add(float64(len(cds))) 1701 series.modTime = s.persistence.seriesFileModTime(fp) 1702 }() 1703 1704 // Get the actual chunks from underneath the chunk.Descs. 1705 // No lock required as chunks still to persist cannot be evicted. 1706 chunks := make([]chunk.Chunk, len(cds)) 1707 for i, cd := range cds { 1708 chunks[i] = cd.C 1709 } 1710 1711 if !series.firstTime().Before(beforeTime) { 1712 // Oldest sample not old enough, just append chunks, if any. 1713 if len(cds) == 0 { 1714 return false 1715 } 1716 var offset int 1717 offset, persistErr = s.persistence.persistChunks(fp, chunks) 1718 if persistErr != nil { 1719 return true 1720 } 1721 if series.chunkDescsOffset == -1 { 1722 // This is the first chunk persisted for a newly created 1723 // series that had prior chunks on disk. Finally, we can 1724 // set the chunkDescsOffset. 1725 series.chunkDescsOffset = offset 1726 } 1727 return false 1728 } 1729 1730 newFirstTime, offset, numDroppedFromPersistence, allDroppedFromPersistence, persistErr := 1731 s.persistence.dropAndPersistChunks(fp, beforeTime, chunks) 1732 if persistErr != nil { 1733 return true 1734 } 1735 if persistErr = series.dropChunks(beforeTime); persistErr != nil { 1736 return true 1737 } 1738 if len(series.chunkDescs) == 0 && allDroppedFromPersistence { 1739 // All chunks dropped from both memory and persistence. Delete the series for good. 1740 s.fpToSeries.del(fp) 1741 s.memorySeries.Dec() 1742 s.seriesOps.WithLabelValues(memoryPurge).Inc() 1743 s.persistence.unindexMetric(fp, series.metric) 1744 return true 1745 } 1746 series.savedFirstTime = newFirstTime 1747 if series.chunkDescsOffset == -1 { 1748 series.chunkDescsOffset = offset 1749 } else { 1750 series.chunkDescsOffset -= numDroppedFromPersistence 1751 if series.chunkDescsOffset < 0 { 1752 persistErr = errors.New("dropped more chunks from persistence than from memory") 1753 series.chunkDescsOffset = 0 1754 return true 1755 } 1756 } 1757 return false 1758} 1759 1760// maintainArchivedSeries drops chunks older than beforeTime from an archived 1761// series. If the series contains no chunks after that, it is purged entirely. 1762func (s *MemorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, beforeTime model.Time) { 1763 defer func(begin time.Time) { 1764 s.maintainSeriesDuration.WithLabelValues(maintainArchived).Observe( 1765 time.Since(begin).Seconds(), 1766 ) 1767 }(time.Now()) 1768 1769 s.fpLocker.Lock(fp) 1770 defer s.fpLocker.Unlock(fp) 1771 1772 has, firstTime, lastTime := s.persistence.hasArchivedMetric(fp) 1773 if !has || !firstTime.Before(beforeTime) { 1774 // Oldest sample not old enough, or metric purged or unarchived in the meantime. 1775 return 1776 } 1777 1778 defer s.seriesOps.WithLabelValues(archiveMaintenance).Inc() 1779 1780 newFirstTime, _, _, allDropped, err := s.persistence.dropAndPersistChunks(fp, beforeTime, nil) 1781 if err != nil { 1782 // TODO(beorn7): Should quarantine the series. 1783 s.persistErrors.Inc() 1784 log.Error("Error dropping persisted chunks: ", err) 1785 } 1786 if allDropped { 1787 if err := s.persistence.purgeArchivedMetric(fp); err != nil { 1788 s.persistErrors.Inc() 1789 // purgeArchivedMetric logs the error already. 1790 } 1791 s.seriesOps.WithLabelValues(archivePurge).Inc() 1792 return 1793 } 1794 if err := s.persistence.updateArchivedTimeRange(fp, newFirstTime, lastTime); err != nil { 1795 s.persistErrors.Inc() 1796 log.Errorf("Error updating archived time range for fingerprint %v: %s", fp, err) 1797 } 1798} 1799 1800// See persistence.loadChunks for detailed explanation. 1801func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) { 1802 return s.persistence.loadChunks(fp, indexes, indexOffset) 1803} 1804 1805// See persistence.loadChunkDescs for detailed explanation. 1806func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunk.Desc, error) { 1807 return s.persistence.loadChunkDescs(fp, offsetFromEnd) 1808} 1809 1810// getNumChunksToPersist returns chunksToPersist in a goroutine-safe way. 1811func (s *MemorySeriesStorage) getNumChunksToPersist() int { 1812 return int(atomic.LoadInt64(&s.numChunksToPersist)) 1813} 1814 1815// incNumChunksToPersist increments chunksToPersist in a goroutine-safe way. Use a 1816// negative 'by' to decrement. 1817func (s *MemorySeriesStorage) incNumChunksToPersist(by int) { 1818 atomic.AddInt64(&s.numChunksToPersist, int64(by)) 1819 if by > 0 { 1820 s.queuedChunksToPersist.Add(float64(by)) 1821 } 1822} 1823 1824// getPersistenceUrgencyScore returns an urgency score for the speed of 1825// persisting chunks. The score is between 0 and 1, where 0 means no urgency at 1826// all and 1 means highest urgency. It also returns if the storage is in 1827// "rushed mode". 1828// 1829// The storage enters "rushed mode" if the score exceeds 1830// persintenceUrgencyScoreForEnteringRushedMode at the time this method is 1831// called. It will leave "rushed mode" if, at a later time this method is 1832// called, the score is below persintenceUrgencyScoreForLeavingRushedMode. 1833// "Rushed mode" plays a role for the adaptive series-sync-strategy. It also 1834// switches off early checkpointing (due to dirty series), and it makes series 1835// maintenance happen as quickly as possible. 1836// 1837// A score of 1 will trigger throttling of sample ingestion. 1838// 1839// It is safe to call this method concurrently. 1840func (s *MemorySeriesStorage) getPersistenceUrgencyScore() (float64, bool) { 1841 s.rushedMtx.Lock() 1842 defer s.rushedMtx.Unlock() 1843 1844 score := float64(atomic.LoadInt32(&s.persistUrgency)) / 1000 1845 if score > 1 { 1846 score = 1 1847 } 1848 1849 if s.rushed { 1850 // We are already in rushed mode. If the score is still above 1851 // persintenceUrgencyScoreForLeavingRushedMode, return the score 1852 // and leave things as they are. 1853 if score > persintenceUrgencyScoreForLeavingRushedMode { 1854 return score, true 1855 } 1856 // We are out of rushed mode! 1857 s.rushed = false 1858 log. 1859 With("urgencyScore", score). 1860 With("chunksToPersist", s.getNumChunksToPersist()). 1861 With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)). 1862 Info("Storage has left rushed mode.") 1863 return score, false 1864 } 1865 if score > persintenceUrgencyScoreForEnteringRushedMode { 1866 // Enter rushed mode. 1867 s.rushed = true 1868 log. 1869 With("urgencyScore", score). 1870 With("chunksToPersist", s.getNumChunksToPersist()). 1871 With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)). 1872 Warn("Storage has entered rushed mode.") 1873 } 1874 return score, s.rushed 1875} 1876 1877// quarantineSeries registers the provided fingerprint for quarantining. It 1878// always returns immediately. Quarantine requests are processed 1879// asynchronously. If there are too many requests queued, they are simply 1880// dropped. 1881// 1882// Quarantining means that the series file is moved to the orphaned directory, 1883// and all its traces are removed from indices. Call this method if an 1884// unrecoverable error is detected while dealing with a series, and pass in the 1885// encountered error. It will be saved as a hint in the orphaned directory. 1886func (s *MemorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) { 1887 req := quarantineRequest{fp: fp, metric: metric, reason: err} 1888 select { 1889 case s.quarantineRequests <- req: 1890 // Request submitted. 1891 default: 1892 log. 1893 With("fingerprint", fp). 1894 With("metric", metric). 1895 With("reason", err). 1896 Warn("Quarantine queue full. Dropped quarantine request.") 1897 s.seriesOps.WithLabelValues(droppedQuarantine).Inc() 1898 } 1899} 1900 1901func (s *MemorySeriesStorage) handleQuarantine() { 1902 for { 1903 select { 1904 case req := <-s.quarantineRequests: 1905 s.purgeSeries(req.fp, req.metric, req.reason) 1906 log. 1907 With("fingerprint", req.fp). 1908 With("metric", req.metric). 1909 With("reason", req.reason). 1910 Warn("Series quarantined.") 1911 case <-s.quarantineStopping: 1912 log.Info("Series quarantining stopped.") 1913 close(s.quarantineStopped) 1914 return 1915 } 1916 } 1917 1918} 1919 1920// purgeSeries removes all traces of a series. If a non-nil quarantine reason is 1921// provided, the series file will not be deleted completely, but moved to the 1922// orphaned directory with the reason and the metric in a hint file. The 1923// provided metric might be nil if unknown. 1924func (s *MemorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) { 1925 s.fpLocker.Lock(fp) 1926 1927 var ( 1928 series *memorySeries 1929 ok bool 1930 ) 1931 1932 if series, ok = s.fpToSeries.get(fp); ok { 1933 s.fpToSeries.del(fp) 1934 s.memorySeries.Dec() 1935 m = series.metric 1936 1937 // Adjust s.chunksToPersist and chunk.NumMemChunks down by 1938 // the number of chunks in this series that are not 1939 // persisted yet. Persisted chunks will be deducted from 1940 // chunk.NumMemChunks upon eviction. 1941 numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark 1942 atomic.AddInt64(&chunk.NumMemChunks, int64(-numChunksNotYetPersisted)) 1943 if !series.headChunkClosed { 1944 // Head chunk wasn't counted as waiting for persistence yet. 1945 // (But it was counted as a chunk in memory.) 1946 numChunksNotYetPersisted-- 1947 } 1948 s.incNumChunksToPersist(-numChunksNotYetPersisted) 1949 1950 } else { 1951 s.persistence.purgeArchivedMetric(fp) // Ignoring error. There is nothing we can do. 1952 } 1953 if m != nil { 1954 // If we know a metric now, unindex it in any case. 1955 // purgeArchivedMetric might have done so already, but we cannot 1956 // be sure. Unindexing in idempotent, though. 1957 s.persistence.unindexMetric(fp, m) 1958 } 1959 // Attempt to delete/quarantine the series file in any case. 1960 if quarantineReason == nil { 1961 // No reason stated, simply delete the file. 1962 if _, err := s.persistence.deleteSeriesFile(fp); err != nil { 1963 log. 1964 With("fingerprint", fp). 1965 With("metric", m). 1966 With("error", err). 1967 Error("Error deleting series file.") 1968 } 1969 s.seriesOps.WithLabelValues(requestedPurge).Inc() 1970 } else { 1971 if err := s.persistence.quarantineSeriesFile(fp, quarantineReason, m); err == nil { 1972 s.seriesOps.WithLabelValues(completedQurantine).Inc() 1973 } else { 1974 s.seriesOps.WithLabelValues(failedQuarantine).Inc() 1975 log. 1976 With("fingerprint", fp). 1977 With("metric", m). 1978 With("reason", quarantineReason). 1979 With("error", err). 1980 Error("Error quarantining series file.") 1981 } 1982 } 1983 1984 s.fpLocker.Unlock(fp) 1985} 1986 1987// Describe implements prometheus.Collector. 1988func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { 1989 s.persistence.Describe(ch) 1990 s.mapper.Describe(ch) 1991 1992 ch <- s.persistErrors.Desc() 1993 ch <- s.queuedChunksToPersist.Desc() 1994 ch <- s.chunksToPersist.Desc() 1995 ch <- s.memorySeries.Desc() 1996 ch <- s.headChunks.Desc() 1997 ch <- s.dirtySeries.Desc() 1998 s.seriesOps.Describe(ch) 1999 ch <- s.ingestedSamples.Desc() 2000 s.discardedSamples.Describe(ch) 2001 ch <- s.nonExistentSeriesMatches.Desc() 2002 ch <- s.memChunks.Desc() 2003 s.maintainSeriesDuration.Describe(ch) 2004 ch <- s.persistenceUrgencyScore.Desc() 2005 ch <- s.rushedMode.Desc() 2006 ch <- s.targetHeapSizeBytes.Desc() 2007} 2008 2009// Collect implements prometheus.Collector. 2010func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) { 2011 s.persistence.Collect(ch) 2012 s.mapper.Collect(ch) 2013 2014 ch <- s.persistErrors 2015 ch <- s.queuedChunksToPersist 2016 ch <- s.chunksToPersist 2017 ch <- s.memorySeries 2018 ch <- s.headChunks 2019 ch <- s.dirtySeries 2020 s.seriesOps.Collect(ch) 2021 ch <- s.ingestedSamples 2022 s.discardedSamples.Collect(ch) 2023 ch <- s.nonExistentSeriesMatches 2024 ch <- s.memChunks 2025 s.maintainSeriesDuration.Collect(ch) 2026 ch <- s.persistenceUrgencyScore 2027 ch <- s.rushedMode 2028 ch <- s.targetHeapSizeBytes 2029} 2030