1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package tsdb 15 16import ( 17 "fmt" 18 "io" 19 "math" 20 "path/filepath" 21 "sync" 22 "time" 23 24 "github.com/go-kit/log" 25 "github.com/go-kit/log/level" 26 "github.com/oklog/ulid" 27 "github.com/pkg/errors" 28 "github.com/prometheus/client_golang/prometheus" 29 "go.uber.org/atomic" 30 31 "github.com/prometheus/prometheus/config" 32 "github.com/prometheus/prometheus/pkg/exemplar" 33 "github.com/prometheus/prometheus/pkg/labels" 34 "github.com/prometheus/prometheus/storage" 35 "github.com/prometheus/prometheus/tsdb/chunkenc" 36 "github.com/prometheus/prometheus/tsdb/chunks" 37 tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" 38 "github.com/prometheus/prometheus/tsdb/index" 39 "github.com/prometheus/prometheus/tsdb/record" 40 "github.com/prometheus/prometheus/tsdb/tombstones" 41 "github.com/prometheus/prometheus/tsdb/tsdbutil" 42 "github.com/prometheus/prometheus/tsdb/wal" 43) 44 45var ( 46 // ErrInvalidSample is returned if an appended sample is not valid and can't 47 // be ingested. 48 ErrInvalidSample = errors.New("invalid sample") 49 // ErrInvalidExemplar is returned if an appended exemplar is not valid and can't 50 // be ingested. 51 ErrInvalidExemplar = errors.New("invalid exemplar") 52 // ErrAppenderClosed is returned if an appender has already be successfully 53 // rolled back or committed. 54 ErrAppenderClosed = errors.New("appender closed") 55) 56 57// Head handles reads and writes of time series data within a time window. 58type Head struct { 59 chunkRange atomic.Int64 60 numSeries atomic.Uint64 61 minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. 62 minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. 63 lastWALTruncationTime atomic.Int64 64 lastMemoryTruncationTime atomic.Int64 65 lastSeriesID atomic.Uint64 66 67 metrics *headMetrics 68 opts *HeadOptions 69 wal *wal.WAL 70 exemplarMetrics *ExemplarMetrics 71 exemplars ExemplarStorage 72 logger log.Logger 73 appendPool sync.Pool 74 exemplarsPool sync.Pool 75 seriesPool sync.Pool 76 bytesPool sync.Pool 77 memChunkPool sync.Pool 78 79 // All series addressable by their ID or hash. 80 series *stripeSeries 81 82 deletedMtx sync.Mutex 83 deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until. 84 85 postings *index.MemPostings // Postings lists for terms. 86 87 tombstones *tombstones.MemTombstones 88 89 iso *isolation 90 91 cardinalityMutex sync.Mutex 92 cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec. 93 lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching. 94 95 // chunkDiskMapper is used to write and read Head chunks to/from disk. 96 chunkDiskMapper *chunks.ChunkDiskMapper 97 98 chunkSnapshotMtx sync.Mutex 99 100 closedMtx sync.Mutex 101 closed bool 102 103 stats *HeadStats 104 reg prometheus.Registerer 105 106 memTruncationInProcess atomic.Bool 107} 108 109type ExemplarStorage interface { 110 storage.ExemplarQueryable 111 AddExemplar(labels.Labels, exemplar.Exemplar) error 112 ValidateExemplar(labels.Labels, exemplar.Exemplar) error 113 IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error 114} 115 116// HeadOptions are parameters for the Head block. 117type HeadOptions struct { 118 // Runtime reloadable option. At the top of the struct for 32 bit OS: 119 // https://pkg.go.dev/sync/atomic#pkg-note-BUG 120 MaxExemplars atomic.Int64 121 122 ChunkRange int64 123 // ChunkDirRoot is the parent directory of the chunks directory. 124 ChunkDirRoot string 125 ChunkPool chunkenc.Pool 126 ChunkWriteBufferSize int 127 // StripeSize sets the number of entries in the hash map, it must be a power of 2. 128 // A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. 129 // A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. 130 StripeSize int 131 SeriesCallback SeriesLifecycleCallback 132 EnableExemplarStorage bool 133 EnableMemorySnapshotOnShutdown bool 134} 135 136func DefaultHeadOptions() *HeadOptions { 137 return &HeadOptions{ 138 ChunkRange: DefaultBlockDuration, 139 ChunkDirRoot: "", 140 ChunkPool: chunkenc.NewPool(), 141 ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, 142 StripeSize: DefaultStripeSize, 143 SeriesCallback: &noopSeriesLifecycleCallback{}, 144 } 145} 146 147// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. 148// It is always a no-op in Prometheus and mainly meant for external users who import TSDB. 149// All the callbacks should be safe to be called concurrently. 150// It is up to the user to implement soft or hard consistency by making the callbacks 151// atomic or non-atomic. Atomic callbacks can cause degradation performance. 152type SeriesLifecycleCallback interface { 153 // PreCreation is called before creating a series to indicate if the series can be created. 154 // A non nil error means the series should not be created. 155 PreCreation(labels.Labels) error 156 // PostCreation is called after creating a series to indicate a creation of series. 157 PostCreation(labels.Labels) 158 // PostDeletion is called after deletion of series. 159 PostDeletion(...labels.Labels) 160} 161 162// NewHead opens the head block in dir. 163func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) { 164 var err error 165 if l == nil { 166 l = log.NewNopLogger() 167 } 168 if opts.ChunkRange < 1 { 169 return nil, errors.Errorf("invalid chunk range %d", opts.ChunkRange) 170 } 171 if opts.SeriesCallback == nil { 172 opts.SeriesCallback = &noopSeriesLifecycleCallback{} 173 } 174 175 if stats == nil { 176 stats = NewHeadStats() 177 } 178 179 if !opts.EnableExemplarStorage { 180 opts.MaxExemplars.Store(0) 181 } 182 183 h := &Head{ 184 wal: wal, 185 logger: l, 186 opts: opts, 187 memChunkPool: sync.Pool{ 188 New: func() interface{} { 189 return &memChunk{} 190 }, 191 }, 192 stats: stats, 193 reg: r, 194 } 195 if err := h.resetInMemoryState(); err != nil { 196 return nil, err 197 } 198 h.metrics = newHeadMetrics(h, r) 199 200 if opts.ChunkPool == nil { 201 opts.ChunkPool = chunkenc.NewPool() 202 } 203 204 h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( 205 mmappedChunksDir(opts.ChunkDirRoot), 206 opts.ChunkPool, 207 opts.ChunkWriteBufferSize, 208 ) 209 if err != nil { 210 return nil, err 211 } 212 213 return h, nil 214} 215 216func (h *Head) resetInMemoryState() error { 217 var err error 218 var em *ExemplarMetrics 219 if h.exemplars != nil { 220 ce, ok := h.exemplars.(*CircularExemplarStorage) 221 if ok { 222 em = ce.metrics 223 } 224 } 225 if em == nil { 226 em = NewExemplarMetrics(h.reg) 227 } 228 es, err := NewCircularExemplarStorage(h.opts.MaxExemplars.Load(), em) 229 if err != nil { 230 return err 231 } 232 233 h.exemplarMetrics = em 234 h.exemplars = es 235 h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback) 236 h.postings = index.NewUnorderedMemPostings() 237 h.tombstones = tombstones.NewMemTombstones() 238 h.iso = newIsolation() 239 h.deleted = map[uint64]int{} 240 h.chunkRange.Store(h.opts.ChunkRange) 241 h.minTime.Store(math.MaxInt64) 242 h.maxTime.Store(math.MinInt64) 243 h.lastWALTruncationTime.Store(math.MinInt64) 244 h.lastMemoryTruncationTime.Store(math.MinInt64) 245 return nil 246} 247 248type headMetrics struct { 249 activeAppenders prometheus.Gauge 250 series prometheus.GaugeFunc 251 seriesCreated prometheus.Counter 252 seriesRemoved prometheus.Counter 253 seriesNotFound prometheus.Counter 254 chunks prometheus.Gauge 255 chunksCreated prometheus.Counter 256 chunksRemoved prometheus.Counter 257 gcDuration prometheus.Summary 258 samplesAppended prometheus.Counter 259 outOfBoundSamples prometheus.Counter 260 outOfOrderSamples prometheus.Counter 261 walTruncateDuration prometheus.Summary 262 walCorruptionsTotal prometheus.Counter 263 walTotalReplayDuration prometheus.Gauge 264 headTruncateFail prometheus.Counter 265 headTruncateTotal prometheus.Counter 266 checkpointDeleteFail prometheus.Counter 267 checkpointDeleteTotal prometheus.Counter 268 checkpointCreationFail prometheus.Counter 269 checkpointCreationTotal prometheus.Counter 270 mmapChunkCorruptionTotal prometheus.Counter 271 snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1. 272} 273 274func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { 275 m := &headMetrics{ 276 activeAppenders: prometheus.NewGauge(prometheus.GaugeOpts{ 277 Name: "prometheus_tsdb_head_active_appenders", 278 Help: "Number of currently active appender transactions", 279 }), 280 series: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 281 Name: "prometheus_tsdb_head_series", 282 Help: "Total number of series in the head block.", 283 }, func() float64 { 284 return float64(h.NumSeries()) 285 }), 286 seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{ 287 Name: "prometheus_tsdb_head_series_created_total", 288 Help: "Total number of series created in the head", 289 }), 290 seriesRemoved: prometheus.NewCounter(prometheus.CounterOpts{ 291 Name: "prometheus_tsdb_head_series_removed_total", 292 Help: "Total number of series removed in the head", 293 }), 294 seriesNotFound: prometheus.NewCounter(prometheus.CounterOpts{ 295 Name: "prometheus_tsdb_head_series_not_found_total", 296 Help: "Total number of requests for series that were not found.", 297 }), 298 chunks: prometheus.NewGauge(prometheus.GaugeOpts{ 299 Name: "prometheus_tsdb_head_chunks", 300 Help: "Total number of chunks in the head block.", 301 }), 302 chunksCreated: prometheus.NewCounter(prometheus.CounterOpts{ 303 Name: "prometheus_tsdb_head_chunks_created_total", 304 Help: "Total number of chunks created in the head", 305 }), 306 chunksRemoved: prometheus.NewCounter(prometheus.CounterOpts{ 307 Name: "prometheus_tsdb_head_chunks_removed_total", 308 Help: "Total number of chunks removed in the head", 309 }), 310 gcDuration: prometheus.NewSummary(prometheus.SummaryOpts{ 311 Name: "prometheus_tsdb_head_gc_duration_seconds", 312 Help: "Runtime of garbage collection in the head block.", 313 }), 314 walTruncateDuration: prometheus.NewSummary(prometheus.SummaryOpts{ 315 Name: "prometheus_tsdb_wal_truncate_duration_seconds", 316 Help: "Duration of WAL truncation.", 317 }), 318 walCorruptionsTotal: prometheus.NewCounter(prometheus.CounterOpts{ 319 Name: "prometheus_tsdb_wal_corruptions_total", 320 Help: "Total number of WAL corruptions.", 321 }), 322 walTotalReplayDuration: prometheus.NewGauge(prometheus.GaugeOpts{ 323 Name: "prometheus_tsdb_data_replay_duration_seconds", 324 Help: "Time taken to replay the data on disk.", 325 }), 326 samplesAppended: prometheus.NewCounter(prometheus.CounterOpts{ 327 Name: "prometheus_tsdb_head_samples_appended_total", 328 Help: "Total number of appended samples.", 329 }), 330 outOfBoundSamples: prometheus.NewCounter(prometheus.CounterOpts{ 331 Name: "prometheus_tsdb_out_of_bound_samples_total", 332 Help: "Total number of out of bound samples ingestion failed attempts.", 333 }), 334 outOfOrderSamples: prometheus.NewCounter(prometheus.CounterOpts{ 335 Name: "prometheus_tsdb_out_of_order_samples_total", 336 Help: "Total number of out of order samples ingestion failed attempts.", 337 }), 338 headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{ 339 Name: "prometheus_tsdb_head_truncations_failed_total", 340 Help: "Total number of head truncations that failed.", 341 }), 342 headTruncateTotal: prometheus.NewCounter(prometheus.CounterOpts{ 343 Name: "prometheus_tsdb_head_truncations_total", 344 Help: "Total number of head truncations attempted.", 345 }), 346 checkpointDeleteFail: prometheus.NewCounter(prometheus.CounterOpts{ 347 Name: "prometheus_tsdb_checkpoint_deletions_failed_total", 348 Help: "Total number of checkpoint deletions that failed.", 349 }), 350 checkpointDeleteTotal: prometheus.NewCounter(prometheus.CounterOpts{ 351 Name: "prometheus_tsdb_checkpoint_deletions_total", 352 Help: "Total number of checkpoint deletions attempted.", 353 }), 354 checkpointCreationFail: prometheus.NewCounter(prometheus.CounterOpts{ 355 Name: "prometheus_tsdb_checkpoint_creations_failed_total", 356 Help: "Total number of checkpoint creations that failed.", 357 }), 358 checkpointCreationTotal: prometheus.NewCounter(prometheus.CounterOpts{ 359 Name: "prometheus_tsdb_checkpoint_creations_total", 360 Help: "Total number of checkpoint creations attempted.", 361 }), 362 mmapChunkCorruptionTotal: prometheus.NewCounter(prometheus.CounterOpts{ 363 Name: "prometheus_tsdb_mmap_chunk_corruptions_total", 364 Help: "Total number of memory-mapped chunk corruptions.", 365 }), 366 snapshotReplayErrorTotal: prometheus.NewCounter(prometheus.CounterOpts{ 367 Name: "prometheus_tsdb_snapshot_replay_error_total", 368 Help: "Total number snapshot replays that failed.", 369 }), 370 } 371 372 if r != nil { 373 r.MustRegister( 374 m.activeAppenders, 375 m.series, 376 m.chunks, 377 m.chunksCreated, 378 m.chunksRemoved, 379 m.seriesCreated, 380 m.seriesRemoved, 381 m.seriesNotFound, 382 m.gcDuration, 383 m.walTruncateDuration, 384 m.walCorruptionsTotal, 385 m.walTotalReplayDuration, 386 m.samplesAppended, 387 m.outOfBoundSamples, 388 m.outOfOrderSamples, 389 m.headTruncateFail, 390 m.headTruncateTotal, 391 m.checkpointDeleteFail, 392 m.checkpointDeleteTotal, 393 m.checkpointCreationFail, 394 m.checkpointCreationTotal, 395 m.mmapChunkCorruptionTotal, 396 m.snapshotReplayErrorTotal, 397 // Metrics bound to functions and not needed in tests 398 // can be created and registered on the spot. 399 prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 400 Name: "prometheus_tsdb_head_max_time", 401 Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.", 402 }, func() float64 { 403 return float64(h.MaxTime()) 404 }), 405 prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 406 Name: "prometheus_tsdb_head_min_time", 407 Help: "Minimum time bound of the head block. The unit is decided by the library consumer.", 408 }, func() float64 { 409 return float64(h.MinTime()) 410 }), 411 prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 412 Name: "prometheus_tsdb_isolation_low_watermark", 413 Help: "The lowest TSDB append ID that is still referenced.", 414 }, func() float64 { 415 return float64(h.iso.lowWatermark()) 416 }), 417 prometheus.NewGaugeFunc(prometheus.GaugeOpts{ 418 Name: "prometheus_tsdb_isolation_high_watermark", 419 Help: "The highest TSDB append ID that has been given out.", 420 }, func() float64 { 421 return float64(h.iso.lastAppendID()) 422 }), 423 ) 424 } 425 return m 426} 427 428func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") } 429 430// HeadStats are the statistics for the head component of the DB. 431type HeadStats struct { 432 WALReplayStatus *WALReplayStatus 433} 434 435// NewHeadStats returns a new HeadStats object. 436func NewHeadStats() *HeadStats { 437 return &HeadStats{ 438 WALReplayStatus: &WALReplayStatus{}, 439 } 440} 441 442// WALReplayStatus contains status information about the WAL replay. 443type WALReplayStatus struct { 444 sync.RWMutex 445 Min int 446 Max int 447 Current int 448} 449 450// GetWALReplayStatus returns the WAL replay status information. 451func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus { 452 s.RLock() 453 defer s.RUnlock() 454 455 return WALReplayStatus{ 456 Min: s.Min, 457 Max: s.Max, 458 Current: s.Current, 459 } 460} 461 462const cardinalityCacheExpirationTime = time.Duration(30) * time.Second 463 464// Init loads data from the write ahead log and prepares the head for writes. 465// It should be called before using an appender so that it 466// limits the ingested samples to the head min valid time. 467func (h *Head) Init(minValidTime int64) error { 468 h.minValidTime.Store(minValidTime) 469 defer h.postings.EnsureOrder() 470 defer h.gc() // After loading the wal remove the obsolete data from the head. 471 defer func() { 472 // Loading of m-mapped chunks and snapshot can make the mint of the Head 473 // to go below minValidTime. 474 if h.MinTime() < h.minValidTime.Load() { 475 h.minTime.Store(h.minValidTime.Load()) 476 } 477 }() 478 479 level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any") 480 start := time.Now() 481 482 snapIdx, snapOffset := -1, 0 483 refSeries := make(map[uint64]*memSeries) 484 485 if h.opts.EnableMemorySnapshotOnShutdown { 486 level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot") 487 var err error 488 snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot() 489 if err != nil { 490 snapIdx, snapOffset = -1, 0 491 h.metrics.snapshotReplayErrorTotal.Inc() 492 level.Error(h.logger).Log("msg", "Failed to load chunk snapshot", "err", err) 493 // We clear the partially loaded data to replay fresh from the WAL. 494 if err := h.resetInMemoryState(); err != nil { 495 return err 496 } 497 } 498 level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String()) 499 } 500 501 mmapChunkReplayStart := time.Now() 502 mmappedChunks, err := h.loadMmappedChunks(refSeries) 503 if err != nil { 504 level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err) 505 if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok { 506 h.metrics.mmapChunkCorruptionTotal.Inc() 507 } 508 // If this fails, data will be recovered from WAL. 509 // Hence we wont lose any data (given WAL is not corrupt). 510 mmappedChunks = h.removeCorruptedMmappedChunks(err, refSeries) 511 } 512 513 level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String()) 514 if h.wal == nil { 515 level.Info(h.logger).Log("msg", "WAL not found") 516 return nil 517 } 518 519 level.Info(h.logger).Log("msg", "Replaying WAL, this may take a while") 520 521 checkpointReplayStart := time.Now() 522 // Backfill the checkpoint first if it exists. 523 dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir()) 524 if err != nil && err != record.ErrNotFound { 525 return errors.Wrap(err, "find last checkpoint") 526 } 527 528 // Find the last segment. 529 _, endAt, e := wal.Segments(h.wal.Dir()) 530 if e != nil { 531 return errors.Wrap(e, "finding WAL segments") 532 } 533 534 h.startWALReplayStatus(startFrom, endAt) 535 536 multiRef := map[uint64]uint64{} 537 if err == nil && startFrom >= snapIdx { 538 sr, err := wal.NewSegmentsReader(dir) 539 if err != nil { 540 return errors.Wrap(err, "open checkpoint") 541 } 542 defer func() { 543 if err := sr.Close(); err != nil { 544 level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err) 545 } 546 }() 547 548 // A corrupted checkpoint is a hard error for now and requires user 549 // intervention. There's likely little data that can be recovered anyway. 550 if err := h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks); err != nil { 551 return errors.Wrap(err, "backfill checkpoint") 552 } 553 h.updateWALReplayStatusRead(startFrom) 554 startFrom++ 555 level.Info(h.logger).Log("msg", "WAL checkpoint loaded") 556 } 557 checkpointReplayDuration := time.Since(checkpointReplayStart) 558 559 walReplayStart := time.Now() 560 561 if snapIdx > startFrom { 562 startFrom = snapIdx 563 } 564 // Backfill segments from the most recent checkpoint onwards. 565 for i := startFrom; i <= endAt; i++ { 566 s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i)) 567 if err != nil { 568 return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) 569 } 570 571 offset := 0 572 if i == snapIdx { 573 offset = snapOffset 574 } 575 sr, err := wal.NewSegmentBufReaderWithOffset(offset, s) 576 if errors.Cause(err) == io.EOF { 577 // File does not exist. 578 continue 579 } 580 if err != nil { 581 return errors.Wrapf(err, "segment reader (offset=%d)", offset) 582 } 583 err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks) 584 if err := sr.Close(); err != nil { 585 level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err) 586 } 587 if err != nil { 588 return err 589 } 590 level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", endAt) 591 h.updateWALReplayStatusRead(i) 592 } 593 594 walReplayDuration := time.Since(start) 595 h.metrics.walTotalReplayDuration.Set(walReplayDuration.Seconds()) 596 level.Info(h.logger).Log( 597 "msg", "WAL replay completed", 598 "checkpoint_replay_duration", checkpointReplayDuration.String(), 599 "wal_replay_duration", time.Since(walReplayStart).String(), 600 "total_replay_duration", walReplayDuration.String(), 601 ) 602 603 return nil 604} 605 606func (h *Head) loadMmappedChunks(refSeries map[uint64]*memSeries) (map[uint64][]*mmappedChunk, error) { 607 mmappedChunks := map[uint64][]*mmappedChunk{} 608 if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error { 609 if maxt < h.minValidTime.Load() { 610 return nil 611 } 612 ms, ok := refSeries[seriesRef] 613 if !ok { 614 slice := mmappedChunks[seriesRef] 615 if len(slice) > 0 && slice[len(slice)-1].maxTime >= mint { 616 return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef) 617 } 618 619 slice = append(slice, &mmappedChunk{ 620 ref: chunkRef, 621 minTime: mint, 622 maxTime: maxt, 623 numSamples: numSamples, 624 }) 625 mmappedChunks[seriesRef] = slice 626 return nil 627 } 628 629 if len(ms.mmappedChunks) > 0 && ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime >= mint { 630 return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef) 631 } 632 633 h.metrics.chunks.Inc() 634 h.metrics.chunksCreated.Inc() 635 ms.mmappedChunks = append(ms.mmappedChunks, &mmappedChunk{ 636 ref: chunkRef, 637 minTime: mint, 638 maxTime: maxt, 639 numSamples: numSamples, 640 }) 641 h.updateMinMaxTime(mint, maxt) 642 if ms.headChunk != nil && maxt >= ms.headChunk.minTime { 643 // The head chunk was completed and was m-mapped after taking the snapshot. 644 // Hence remove this chunk. 645 ms.nextAt = 0 646 ms.headChunk = nil 647 ms.app = nil 648 } 649 return nil 650 }); err != nil { 651 return nil, errors.Wrap(err, "iterate on on-disk chunks") 652 } 653 return mmappedChunks, nil 654} 655 656// removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously 657// loaded mmapped chunks. 658func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[uint64]*memSeries) map[uint64][]*mmappedChunk { 659 level.Info(h.logger).Log("msg", "Deleting mmapped chunk files") 660 661 if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil { 662 level.Info(h.logger).Log("msg", "Deletion of mmap chunk files failed, discarding chunk files completely", "err", err) 663 return map[uint64][]*mmappedChunk{} 664 } 665 666 level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks") 667 mmappedChunks, err := h.loadMmappedChunks(refSeries) 668 if err != nil { 669 level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err) 670 mmappedChunks = map[uint64][]*mmappedChunk{} 671 } 672 673 return mmappedChunks 674} 675 676func (h *Head) ApplyConfig(cfg *config.Config) error { 677 if !h.opts.EnableExemplarStorage { 678 return nil 679 } 680 681 // Head uses opts.MaxExemplars in combination with opts.EnableExemplarStorage 682 // to decide if it should pass exemplars along to it's exemplar storage, so we 683 // need to update opts.MaxExemplars here. 684 prevSize := h.opts.MaxExemplars.Load() 685 h.opts.MaxExemplars.Store(cfg.StorageConfig.ExemplarsConfig.MaxExemplars) 686 687 if prevSize == h.opts.MaxExemplars.Load() { 688 return nil 689 } 690 691 migrated := h.exemplars.(*CircularExemplarStorage).Resize(h.opts.MaxExemplars.Load()) 692 level.Info(h.logger).Log("msg", "Exemplar storage resized", "from", prevSize, "to", h.opts.MaxExemplars, "migrated", migrated) 693 return nil 694} 695 696// PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names. 697func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats { 698 h.cardinalityMutex.Lock() 699 defer h.cardinalityMutex.Unlock() 700 currentTime := time.Duration(time.Now().Unix()) * time.Second 701 seconds := currentTime - h.lastPostingsStatsCall 702 if seconds > cardinalityCacheExpirationTime { 703 h.cardinalityCache = nil 704 } 705 if h.cardinalityCache != nil { 706 return h.cardinalityCache 707 } 708 h.cardinalityCache = h.postings.Stats(statsByLabelName) 709 h.lastPostingsStatsCall = time.Duration(time.Now().Unix()) * time.Second 710 711 return h.cardinalityCache 712} 713 714func (h *Head) updateMinMaxTime(mint, maxt int64) { 715 for { 716 lt := h.MinTime() 717 if mint >= lt { 718 break 719 } 720 if h.minTime.CAS(lt, mint) { 721 break 722 } 723 } 724 for { 725 ht := h.MaxTime() 726 if maxt <= ht { 727 break 728 } 729 if h.maxTime.CAS(ht, maxt) { 730 break 731 } 732 } 733} 734 735// SetMinValidTime sets the minimum timestamp the head can ingest. 736func (h *Head) SetMinValidTime(minValidTime int64) { 737 h.minValidTime.Store(minValidTime) 738} 739 740// Truncate removes old data before mint from the head and WAL. 741func (h *Head) Truncate(mint int64) (err error) { 742 initialize := h.MinTime() == math.MaxInt64 743 if err := h.truncateMemory(mint); err != nil { 744 return err 745 } 746 if initialize { 747 return nil 748 } 749 return h.truncateWAL(mint) 750} 751 752// OverlapsClosedInterval returns true if the head overlaps [mint, maxt]. 753func (h *Head) OverlapsClosedInterval(mint, maxt int64) bool { 754 return h.MinTime() <= maxt && mint <= h.MaxTime() 755} 756 757// truncateMemory removes old data before mint from the head. 758func (h *Head) truncateMemory(mint int64) (err error) { 759 h.chunkSnapshotMtx.Lock() 760 defer h.chunkSnapshotMtx.Unlock() 761 762 defer func() { 763 if err != nil { 764 h.metrics.headTruncateFail.Inc() 765 } 766 }() 767 768 initialize := h.MinTime() == math.MaxInt64 769 770 if h.MinTime() >= mint && !initialize { 771 return nil 772 } 773 774 // The order of these two Store() should not be changed, 775 // i.e. truncation time is set before in-process boolean. 776 h.lastMemoryTruncationTime.Store(mint) 777 h.memTruncationInProcess.Store(true) 778 defer h.memTruncationInProcess.Store(false) 779 780 // We wait for pending queries to end that overlap with this truncation. 781 if !initialize { 782 h.WaitForPendingReadersInTimeRange(h.MinTime(), mint) 783 } 784 785 h.minTime.Store(mint) 786 h.minValidTime.Store(mint) 787 788 // Ensure that max time is at least as high as min time. 789 for h.MaxTime() < mint { 790 h.maxTime.CAS(h.MaxTime(), mint) 791 } 792 793 // This was an initial call to Truncate after loading blocks on startup. 794 // We haven't read back the WAL yet, so do not attempt to truncate it. 795 if initialize { 796 return nil 797 } 798 799 h.metrics.headTruncateTotal.Inc() 800 start := time.Now() 801 802 actualMint := h.gc() 803 level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start)) 804 h.metrics.gcDuration.Observe(time.Since(start).Seconds()) 805 if actualMint > h.minTime.Load() { 806 // The actual mint of the Head is higher than the one asked to truncate. 807 appendableMinValidTime := h.appendableMinValidTime() 808 if actualMint < appendableMinValidTime { 809 h.minTime.Store(actualMint) 810 h.minValidTime.Store(actualMint) 811 } else { 812 // The actual min time is in the appendable window. 813 // So we set the mint to the appendableMinValidTime. 814 h.minTime.Store(appendableMinValidTime) 815 h.minValidTime.Store(appendableMinValidTime) 816 } 817 } 818 819 // Truncate the chunk m-mapper. 820 if err := h.chunkDiskMapper.Truncate(mint); err != nil { 821 return errors.Wrap(err, "truncate chunks.HeadReadWriter") 822 } 823 return nil 824} 825 826// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. 827// The query timeout limits the max wait time of this function implicitly. 828// The mint is inclusive and maxt is the truncation time hence exclusive. 829func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64) { 830 maxt-- // Making it inclusive before checking overlaps. 831 overlaps := func() bool { 832 o := false 833 h.iso.TraverseOpenReads(func(s *isolationState) bool { 834 if s.mint <= maxt && mint <= s.maxt { 835 // Overlaps with the truncation range. 836 o = true 837 return false 838 } 839 return true 840 }) 841 return o 842 } 843 for overlaps() { 844 time.Sleep(500 * time.Millisecond) 845 } 846} 847 848// IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier 849// has to be created. In the latter case, the method also returns the new mint to be used for creating the 850// new range head and the new querier. This methods helps preventing races with the truncation of in-memory data. 851// 852// NOTE: The querier should already be taken before calling this. 853func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose bool, getNew bool, newMint int64) { 854 if !h.memTruncationInProcess.Load() { 855 return false, false, 0 856 } 857 // Head truncation is in process. It also means that the block that was 858 // created for this truncation range is also available. 859 // Check if we took a querier that overlaps with this truncation. 860 memTruncTime := h.lastMemoryTruncationTime.Load() 861 if querierMaxt < memTruncTime { 862 // Head compaction has happened and this time range is being truncated. 863 // This query doesn't overlap with the Head any longer. 864 // We should close this querier to avoid races and the data would be 865 // available with the blocks below. 866 // Cases: 867 // 1. |------truncation------| 868 // |---query---| 869 // 2. |------truncation------| 870 // |---query---| 871 return true, false, 0 872 } 873 if querierMint < memTruncTime { 874 // The truncation time is not same as head mint that we saw above but the 875 // query still overlaps with the Head. 876 // The truncation started after we got the querier. So it is not safe 877 // to use this querier and/or might block truncation. We should get 878 // a new querier for the new Head range while remaining will be available 879 // in the blocks below. 880 // Case: 881 // |------truncation------| 882 // |----query----| 883 // Turns into 884 // |------truncation------| 885 // |---qu---| 886 return true, true, memTruncTime 887 } 888 889 // Other case is this, which is a no-op 890 // |------truncation------| 891 // |---query---| 892 return false, false, 0 893} 894 895// truncateWAL removes old data before mint from the WAL. 896func (h *Head) truncateWAL(mint int64) error { 897 h.chunkSnapshotMtx.Lock() 898 defer h.chunkSnapshotMtx.Unlock() 899 900 if h.wal == nil || mint <= h.lastWALTruncationTime.Load() { 901 return nil 902 } 903 start := time.Now() 904 h.lastWALTruncationTime.Store(mint) 905 906 first, last, err := wal.Segments(h.wal.Dir()) 907 if err != nil { 908 return errors.Wrap(err, "get segment range") 909 } 910 // Start a new segment, so low ingestion volume TSDB don't have more WAL than 911 // needed. 912 if err := h.wal.NextSegment(); err != nil { 913 return errors.Wrap(err, "next segment") 914 } 915 last-- // Never consider last segment for checkpoint. 916 if last < 0 { 917 return nil // no segments yet. 918 } 919 // The lower two thirds of segments should contain mostly obsolete samples. 920 // If we have less than two segments, it's not worth checkpointing yet. 921 // With the default 2h blocks, this will keeping up to around 3h worth 922 // of WAL segments. 923 last = first + (last-first)*2/3 924 if last <= first { 925 return nil 926 } 927 928 keep := func(id uint64) bool { 929 if h.series.getByID(id) != nil { 930 return true 931 } 932 h.deletedMtx.Lock() 933 _, ok := h.deleted[id] 934 h.deletedMtx.Unlock() 935 return ok 936 } 937 h.metrics.checkpointCreationTotal.Inc() 938 if _, err = wal.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil { 939 h.metrics.checkpointCreationFail.Inc() 940 if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok { 941 h.metrics.walCorruptionsTotal.Inc() 942 } 943 return errors.Wrap(err, "create checkpoint") 944 } 945 if err := h.wal.Truncate(last + 1); err != nil { 946 // If truncating fails, we'll just try again at the next checkpoint. 947 // Leftover segments will just be ignored in the future if there's a checkpoint 948 // that supersedes them. 949 level.Error(h.logger).Log("msg", "truncating segments failed", "err", err) 950 } 951 952 // The checkpoint is written and segments before it is truncated, so we no 953 // longer need to track deleted series that are before it. 954 h.deletedMtx.Lock() 955 for ref, segment := range h.deleted { 956 if segment < first { 957 delete(h.deleted, ref) 958 } 959 } 960 h.deletedMtx.Unlock() 961 962 h.metrics.checkpointDeleteTotal.Inc() 963 if err := wal.DeleteCheckpoints(h.wal.Dir(), last); err != nil { 964 // Leftover old checkpoints do not cause problems down the line beyond 965 // occupying disk space. 966 // They will just be ignored since a higher checkpoint exists. 967 level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err) 968 h.metrics.checkpointDeleteFail.Inc() 969 } 970 h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) 971 972 level.Info(h.logger).Log("msg", "WAL checkpoint complete", 973 "first", first, "last", last, "duration", time.Since(start)) 974 975 return nil 976} 977 978type Stats struct { 979 NumSeries uint64 980 MinTime, MaxTime int64 981 IndexPostingStats *index.PostingsStats 982} 983 984// Stats returns important current HEAD statistics. Note that it is expensive to 985// calculate these. 986func (h *Head) Stats(statsByLabelName string) *Stats { 987 return &Stats{ 988 NumSeries: h.NumSeries(), 989 MaxTime: h.MaxTime(), 990 MinTime: h.MinTime(), 991 IndexPostingStats: h.PostingsCardinalityStats(statsByLabelName), 992 } 993} 994 995type RangeHead struct { 996 head *Head 997 mint, maxt int64 998} 999 1000// NewRangeHead returns a *RangeHead. 1001func NewRangeHead(head *Head, mint, maxt int64) *RangeHead { 1002 return &RangeHead{ 1003 head: head, 1004 mint: mint, 1005 maxt: maxt, 1006 } 1007} 1008 1009func (h *RangeHead) Index() (IndexReader, error) { 1010 return h.head.indexRange(h.mint, h.maxt), nil 1011} 1012 1013func (h *RangeHead) Chunks() (ChunkReader, error) { 1014 return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State(h.mint, h.maxt)) 1015} 1016 1017func (h *RangeHead) Tombstones() (tombstones.Reader, error) { 1018 return h.head.tombstones, nil 1019} 1020 1021func (h *RangeHead) MinTime() int64 { 1022 return h.mint 1023} 1024 1025// MaxTime returns the max time of actual data fetch-able from the head. 1026// This controls the chunks time range which is closed [b.MinTime, b.MaxTime]. 1027func (h *RangeHead) MaxTime() int64 { 1028 return h.maxt 1029} 1030 1031// BlockMaxTime returns the max time of the potential block created from this head. 1032// It's different to MaxTime as we need to add +1 millisecond to block maxt because block 1033// intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. 1034func (h *RangeHead) BlockMaxTime() int64 { 1035 return h.MaxTime() + 1 1036} 1037 1038func (h *RangeHead) NumSeries() uint64 { 1039 return h.head.NumSeries() 1040} 1041 1042func (h *RangeHead) Meta() BlockMeta { 1043 return BlockMeta{ 1044 MinTime: h.MinTime(), 1045 MaxTime: h.MaxTime(), 1046 ULID: h.head.Meta().ULID, 1047 Stats: BlockStats{ 1048 NumSeries: h.NumSeries(), 1049 }, 1050 } 1051} 1052 1053// String returns an human readable representation of the range head. It's important to 1054// keep this function in order to avoid the struct dump when the head is stringified in 1055// errors or logs. 1056func (h *RangeHead) String() string { 1057 return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime()) 1058} 1059 1060// Delete all samples in the range of [mint, maxt] for series that satisfy the given 1061// label matchers. 1062func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { 1063 // Do not delete anything beyond the currently valid range. 1064 mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime()) 1065 1066 ir := h.indexRange(mint, maxt) 1067 1068 p, err := PostingsForMatchers(ir, ms...) 1069 if err != nil { 1070 return errors.Wrap(err, "select series") 1071 } 1072 1073 var stones []tombstones.Stone 1074 for p.Next() { 1075 series := h.series.getByID(p.At()) 1076 1077 series.RLock() 1078 t0, t1 := series.minTime(), series.maxTime() 1079 series.RUnlock() 1080 if t0 == math.MinInt64 || t1 == math.MinInt64 { 1081 continue 1082 } 1083 // Delete only until the current values and not beyond. 1084 t0, t1 = clampInterval(mint, maxt, t0, t1) 1085 stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}}) 1086 } 1087 if p.Err() != nil { 1088 return p.Err() 1089 } 1090 if h.wal != nil { 1091 var enc record.Encoder 1092 if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { 1093 return err 1094 } 1095 } 1096 for _, s := range stones { 1097 h.tombstones.AddInterval(s.Ref, s.Intervals[0]) 1098 } 1099 1100 return nil 1101} 1102 1103// gc removes data before the minimum timestamp from the head. 1104// It returns the actual min times of the chunks present in the Head. 1105func (h *Head) gc() int64 { 1106 // Only data strictly lower than this timestamp must be deleted. 1107 mint := h.MinTime() 1108 1109 // Drop old chunks and remember series IDs and hashes if they can be 1110 // deleted entirely. 1111 deleted, chunksRemoved, actualMint := h.series.gc(mint) 1112 seriesRemoved := len(deleted) 1113 1114 h.metrics.seriesRemoved.Add(float64(seriesRemoved)) 1115 h.metrics.chunksRemoved.Add(float64(chunksRemoved)) 1116 h.metrics.chunks.Sub(float64(chunksRemoved)) 1117 h.numSeries.Sub(uint64(seriesRemoved)) 1118 1119 // Remove deleted series IDs from the postings lists. 1120 h.postings.Delete(deleted) 1121 1122 // Remove tombstones referring to the deleted series. 1123 h.tombstones.DeleteTombstones(deleted) 1124 h.tombstones.TruncateBefore(mint) 1125 1126 if h.wal != nil { 1127 _, last, _ := wal.Segments(h.wal.Dir()) 1128 h.deletedMtx.Lock() 1129 // Keep series records until we're past segment 'last' 1130 // because the WAL will still have samples records with 1131 // this ref ID. If we didn't keep these series records then 1132 // on start up when we replay the WAL, or any other code 1133 // that reads the WAL, wouldn't be able to use those 1134 // samples since we would have no labels for that ref ID. 1135 for ref := range deleted { 1136 h.deleted[ref] = last 1137 } 1138 h.deletedMtx.Unlock() 1139 } 1140 1141 return actualMint 1142} 1143 1144// Tombstones returns a new reader over the head's tombstones 1145func (h *Head) Tombstones() (tombstones.Reader, error) { 1146 return h.tombstones, nil 1147} 1148 1149// NumSeries returns the number of active series in the head. 1150func (h *Head) NumSeries() uint64 { 1151 return h.numSeries.Load() 1152} 1153 1154// Meta returns meta information about the head. 1155// The head is dynamic so will return dynamic results. 1156func (h *Head) Meta() BlockMeta { 1157 var id [16]byte 1158 copy(id[:], "______head______") 1159 return BlockMeta{ 1160 MinTime: h.MinTime(), 1161 MaxTime: h.MaxTime(), 1162 ULID: ulid.ULID(id), 1163 Stats: BlockStats{ 1164 NumSeries: h.NumSeries(), 1165 }, 1166 } 1167} 1168 1169// MinTime returns the lowest time bound on visible data in the head. 1170func (h *Head) MinTime() int64 { 1171 return h.minTime.Load() 1172} 1173 1174// MaxTime returns the highest timestamp seen in data of the head. 1175func (h *Head) MaxTime() int64 { 1176 return h.maxTime.Load() 1177} 1178 1179// compactable returns whether the head has a compactable range. 1180// The head has a compactable range when the head time range is 1.5 times the chunk range. 1181// The 0.5 acts as a buffer of the appendable window. 1182func (h *Head) compactable() bool { 1183 return h.MaxTime()-h.MinTime() > h.chunkRange.Load()/2*3 1184} 1185 1186// Close flushes the WAL and closes the head. 1187// It also takes a snapshot of in-memory chunks if enabled. 1188func (h *Head) Close() error { 1189 h.closedMtx.Lock() 1190 defer h.closedMtx.Unlock() 1191 h.closed = true 1192 errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close()) 1193 if h.wal != nil { 1194 errs.Add(h.wal.Close()) 1195 } 1196 if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown { 1197 errs.Add(h.performChunkSnapshot()) 1198 } 1199 return errs.Err() 1200 1201} 1202 1203// String returns an human readable representation of the TSDB head. It's important to 1204// keep this function in order to avoid the struct dump when the head is stringified in 1205// errors or logs. 1206func (h *Head) String() string { 1207 return "head" 1208} 1209 1210func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) { 1211 // Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create 1212 // a new series on every sample inserted via Add(), which causes allocations 1213 // and makes our series IDs rather random and harder to compress in postings. 1214 s := h.series.getByHash(hash, lset) 1215 if s != nil { 1216 return s, false, nil 1217 } 1218 1219 // Optimistically assume that we are the first one to create the series. 1220 id := h.lastSeriesID.Inc() 1221 1222 return h.getOrCreateWithID(id, hash, lset) 1223} 1224 1225func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool, error) { 1226 s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { 1227 return newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool) 1228 }) 1229 if err != nil { 1230 return nil, false, err 1231 } 1232 if !created { 1233 return s, false, nil 1234 } 1235 1236 h.metrics.seriesCreated.Inc() 1237 h.numSeries.Inc() 1238 1239 h.postings.Add(id, lset) 1240 return s, true, nil 1241} 1242 1243// seriesHashmap is a simple hashmap for memSeries by their label set. It is built 1244// on top of a regular hashmap and holds a slice of series to resolve hash collisions. 1245// Its methods require the hash to be submitted with it to avoid re-computations throughout 1246// the code. 1247type seriesHashmap map[uint64][]*memSeries 1248 1249func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { 1250 for _, s := range m[hash] { 1251 if labels.Equal(s.lset, lset) { 1252 return s 1253 } 1254 } 1255 return nil 1256} 1257 1258func (m seriesHashmap) set(hash uint64, s *memSeries) { 1259 l := m[hash] 1260 for i, prev := range l { 1261 if labels.Equal(prev.lset, s.lset) { 1262 l[i] = s 1263 return 1264 } 1265 } 1266 m[hash] = append(l, s) 1267} 1268 1269func (m seriesHashmap) del(hash uint64, lset labels.Labels) { 1270 var rem []*memSeries 1271 for _, s := range m[hash] { 1272 if !labels.Equal(s.lset, lset) { 1273 rem = append(rem, s) 1274 } 1275 } 1276 if len(rem) == 0 { 1277 delete(m, hash) 1278 } else { 1279 m[hash] = rem 1280 } 1281} 1282 1283const ( 1284 // DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map. 1285 DefaultStripeSize = 1 << 14 1286) 1287 1288// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention. 1289// The locks are padded to not be on the same cache line. Filling the padded space 1290// with the maps was profiled to be slower – likely due to the additional pointer 1291// dereferences. 1292type stripeSeries struct { 1293 size int 1294 series []map[uint64]*memSeries 1295 hashes []seriesHashmap 1296 locks []stripeLock 1297 seriesLifecycleCallback SeriesLifecycleCallback 1298} 1299 1300type stripeLock struct { 1301 sync.RWMutex 1302 // Padding to avoid multiple locks being on the same cache line. 1303 _ [40]byte 1304} 1305 1306func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *stripeSeries { 1307 s := &stripeSeries{ 1308 size: stripeSize, 1309 series: make([]map[uint64]*memSeries, stripeSize), 1310 hashes: make([]seriesHashmap, stripeSize), 1311 locks: make([]stripeLock, stripeSize), 1312 seriesLifecycleCallback: seriesCallback, 1313 } 1314 1315 for i := range s.series { 1316 s.series[i] = map[uint64]*memSeries{} 1317 } 1318 for i := range s.hashes { 1319 s.hashes[i] = seriesHashmap{} 1320 } 1321 return s 1322} 1323 1324// gc garbage collects old chunks that are strictly before mint and removes 1325// series entirely that have no chunks left. 1326func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int, int64) { 1327 var ( 1328 deleted = map[uint64]struct{}{} 1329 deletedForCallback = []labels.Labels{} 1330 rmChunks = 0 1331 actualMint int64 = math.MaxInt64 1332 ) 1333 // Run through all series and truncate old chunks. Mark those with no 1334 // chunks left as deleted and store their ID. 1335 for i := 0; i < s.size; i++ { 1336 s.locks[i].Lock() 1337 1338 for hash, all := range s.hashes[i] { 1339 for _, series := range all { 1340 series.Lock() 1341 rmChunks += series.truncateChunksBefore(mint) 1342 1343 if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit { 1344 seriesMint := series.minTime() 1345 if seriesMint < actualMint { 1346 actualMint = seriesMint 1347 } 1348 series.Unlock() 1349 continue 1350 } 1351 1352 // The series is gone entirely. We need to keep the series lock 1353 // and make sure we have acquired the stripe locks for hash and ID of the 1354 // series alike. 1355 // If we don't hold them all, there's a very small chance that a series receives 1356 // samples again while we are half-way into deleting it. 1357 j := int(series.ref) & (s.size - 1) 1358 1359 if i != j { 1360 s.locks[j].Lock() 1361 } 1362 1363 deleted[series.ref] = struct{}{} 1364 s.hashes[i].del(hash, series.lset) 1365 delete(s.series[j], series.ref) 1366 deletedForCallback = append(deletedForCallback, series.lset) 1367 1368 if i != j { 1369 s.locks[j].Unlock() 1370 } 1371 1372 series.Unlock() 1373 } 1374 } 1375 1376 s.locks[i].Unlock() 1377 1378 s.seriesLifecycleCallback.PostDeletion(deletedForCallback...) 1379 deletedForCallback = deletedForCallback[:0] 1380 } 1381 1382 if actualMint == math.MaxInt64 { 1383 actualMint = mint 1384 } 1385 1386 return deleted, rmChunks, actualMint 1387} 1388 1389func (s *stripeSeries) getByID(id uint64) *memSeries { 1390 i := id & uint64(s.size-1) 1391 1392 s.locks[i].RLock() 1393 series := s.series[i][id] 1394 s.locks[i].RUnlock() 1395 1396 return series 1397} 1398 1399func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { 1400 i := hash & uint64(s.size-1) 1401 1402 s.locks[i].RLock() 1403 series := s.hashes[i].get(hash, lset) 1404 s.locks[i].RUnlock() 1405 1406 return series 1407} 1408 1409func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries func() *memSeries) (*memSeries, bool, error) { 1410 // PreCreation is called here to avoid calling it inside the lock. 1411 // It is not necessary to call it just before creating a series, 1412 // rather it gives a 'hint' whether to create a series or not. 1413 preCreationErr := s.seriesLifecycleCallback.PreCreation(lset) 1414 1415 // Create the series, unless the PreCreation() callback as failed. 1416 // If failed, we'll not allow to create a new series anyway. 1417 var series *memSeries 1418 if preCreationErr == nil { 1419 series = createSeries() 1420 } 1421 1422 i := hash & uint64(s.size-1) 1423 s.locks[i].Lock() 1424 1425 if prev := s.hashes[i].get(hash, lset); prev != nil { 1426 s.locks[i].Unlock() 1427 return prev, false, nil 1428 } 1429 if preCreationErr == nil { 1430 s.hashes[i].set(hash, series) 1431 } 1432 s.locks[i].Unlock() 1433 1434 if preCreationErr != nil { 1435 // The callback prevented creation of series. 1436 return nil, false, preCreationErr 1437 } 1438 // Setting the series in the s.hashes marks the creation of series 1439 // as any further calls to this methods would return that series. 1440 s.seriesLifecycleCallback.PostCreation(series.lset) 1441 1442 i = series.ref & uint64(s.size-1) 1443 1444 s.locks[i].Lock() 1445 s.series[i][series.ref] = series 1446 s.locks[i].Unlock() 1447 1448 return series, true, nil 1449} 1450 1451type sample struct { 1452 t int64 1453 v float64 1454} 1455 1456func newSample(t int64, v float64) tsdbutil.Sample { return sample{t, v} } 1457func (s sample) T() int64 { return s.t } 1458func (s sample) V() float64 { return s.v } 1459 1460// memSeries is the in-memory representation of a series. None of its methods 1461// are goroutine safe and it is the caller's responsibility to lock it. 1462type memSeries struct { 1463 sync.RWMutex 1464 1465 ref uint64 1466 lset labels.Labels 1467 mmappedChunks []*mmappedChunk 1468 mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. 1469 headChunk *memChunk 1470 chunkRange int64 1471 firstChunkID int 1472 1473 nextAt int64 // Timestamp at which to cut the next chunk. 1474 sampleBuf [4]sample 1475 pendingCommit bool // Whether there are samples waiting to be committed to this series. 1476 1477 app chunkenc.Appender // Current appender for the chunk. 1478 1479 memChunkPool *sync.Pool 1480 1481 txs *txRing 1482} 1483 1484func newMemSeries(lset labels.Labels, id uint64, chunkRange int64, memChunkPool *sync.Pool) *memSeries { 1485 s := &memSeries{ 1486 lset: lset, 1487 ref: id, 1488 chunkRange: chunkRange, 1489 nextAt: math.MinInt64, 1490 txs: newTxRing(4), 1491 memChunkPool: memChunkPool, 1492 } 1493 return s 1494} 1495 1496func (s *memSeries) minTime() int64 { 1497 if len(s.mmappedChunks) > 0 { 1498 return s.mmappedChunks[0].minTime 1499 } 1500 if s.headChunk != nil { 1501 return s.headChunk.minTime 1502 } 1503 return math.MinInt64 1504} 1505 1506func (s *memSeries) maxTime() int64 { 1507 c := s.head() 1508 if c != nil { 1509 return c.maxTime 1510 } 1511 if len(s.mmappedChunks) > 0 { 1512 return s.mmappedChunks[len(s.mmappedChunks)-1].maxTime 1513 } 1514 return math.MinInt64 1515} 1516 1517// truncateChunksBefore removes all chunks from the series that 1518// have no timestamp at or after mint. 1519// Chunk IDs remain unchanged. 1520func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { 1521 if s.headChunk != nil && s.headChunk.maxTime < mint { 1522 // If head chunk is truncated, we can truncate all mmapped chunks. 1523 removed = 1 + len(s.mmappedChunks) 1524 s.firstChunkID += removed 1525 s.headChunk = nil 1526 s.mmappedChunks = nil 1527 return removed 1528 } 1529 if len(s.mmappedChunks) > 0 { 1530 for i, c := range s.mmappedChunks { 1531 if c.maxTime >= mint { 1532 break 1533 } 1534 removed = i + 1 1535 } 1536 s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[removed:]...) 1537 s.firstChunkID += removed 1538 } 1539 return removed 1540} 1541 1542// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after 1543// acquiring lock. 1544func (s *memSeries) cleanupAppendIDsBelow(bound uint64) { 1545 s.txs.cleanupAppendIDsBelow(bound) 1546} 1547 1548func (s *memSeries) head() *memChunk { 1549 return s.headChunk 1550} 1551 1552type memChunk struct { 1553 chunk chunkenc.Chunk 1554 minTime, maxTime int64 1555} 1556 1557// OverlapsClosedInterval returns true if the chunk overlaps [mint, maxt]. 1558func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool { 1559 return overlapsClosedInterval(mc.minTime, mc.maxTime, mint, maxt) 1560} 1561 1562func overlapsClosedInterval(mint1, maxt1, mint2, maxt2 int64) bool { 1563 return mint1 <= maxt2 && mint2 <= maxt1 1564} 1565 1566type mmappedChunk struct { 1567 ref uint64 1568 numSamples uint16 1569 minTime, maxTime int64 1570} 1571 1572// Returns true if the chunk overlaps [mint, maxt]. 1573func (mc *mmappedChunk) OverlapsClosedInterval(mint, maxt int64) bool { 1574 return overlapsClosedInterval(mc.minTime, mc.maxTime, mint, maxt) 1575} 1576 1577type noopSeriesLifecycleCallback struct{} 1578 1579func (noopSeriesLifecycleCallback) PreCreation(labels.Labels) error { return nil } 1580func (noopSeriesLifecycleCallback) PostCreation(labels.Labels) {} 1581func (noopSeriesLifecycleCallback) PostDeletion(...labels.Labels) {} 1582 1583func (h *Head) Size() int64 { 1584 var walSize int64 1585 if h.wal != nil { 1586 walSize, _ = h.wal.Size() 1587 } 1588 cdmSize, _ := h.chunkDiskMapper.Size() 1589 return walSize + cdmSize 1590} 1591 1592func (h *RangeHead) Size() int64 { 1593 return h.head.Size() 1594} 1595 1596func (h *Head) startWALReplayStatus(startFrom, last int) { 1597 h.stats.WALReplayStatus.Lock() 1598 defer h.stats.WALReplayStatus.Unlock() 1599 1600 h.stats.WALReplayStatus.Min = startFrom 1601 h.stats.WALReplayStatus.Max = last 1602 h.stats.WALReplayStatus.Current = startFrom 1603} 1604 1605func (h *Head) updateWALReplayStatusRead(current int) { 1606 h.stats.WALReplayStatus.Lock() 1607 defer h.stats.WALReplayStatus.Unlock() 1608 1609 h.stats.WALReplayStatus.Current = current 1610} 1611