1// Copyright 2014 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package local 15 16import ( 17 "bufio" 18 "context" 19 "encoding/binary" 20 "fmt" 21 "io" 22 "io/ioutil" 23 "math" 24 "os" 25 "path/filepath" 26 "strconv" 27 "strings" 28 "sync" 29 "sync/atomic" 30 "time" 31 32 "github.com/prometheus/client_golang/prometheus" 33 "github.com/prometheus/common/log" 34 "github.com/prometheus/common/model" 35 36 "github.com/prometheus/prometheus/storage/local/chunk" 37 "github.com/prometheus/prometheus/storage/local/codable" 38 "github.com/prometheus/prometheus/storage/local/index" 39 "github.com/prometheus/prometheus/util/flock" 40) 41 42const ( 43 // Version of the storage as it can be found in the version file. 44 // Increment to protect against incompatible changes. 45 Version = 1 46 versionFileName = "VERSION" 47 48 seriesFileSuffix = ".db" 49 seriesTempFileSuffix = ".db.tmp" 50 seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name. 51 hintFileSuffix = ".hint" 52 53 mappingsFileName = "mappings.db" 54 mappingsTempFileName = "mappings.db.tmp" 55 mappingsFormatVersion = 1 56 mappingsMagicString = "PrometheusMappings" 57 58 dirtyFileName = "DIRTY" 59 60 fileBufSize = 1 << 16 // 64kiB. 61 62 chunkHeaderLen = 17 63 chunkHeaderTypeOffset = 0 64 chunkHeaderFirstTimeOffset = 1 65 chunkHeaderLastTimeOffset = 9 66 chunkLenWithHeader = chunk.ChunkLen + chunkHeaderLen 67 chunkMaxBatchSize = 62 // Max no. of chunks to load or write in 68 // one batch. Note that 62 is the largest number of chunks that fit 69 // into 64kiB on disk because chunkHeaderLen is added to each 1k chunk. 70 71 indexingMaxBatchSize = 1024 * 1024 72 indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. 73 indexingQueueCapacity = 1024 * 256 74) 75 76var fpLen = len(model.Fingerprint(0).String()) // Length of a fingerprint as string. 77 78const ( 79 flagHeadChunkPersisted byte = 1 << iota 80 // Add more flags here like: 81 // flagFoo 82 // flagBar 83) 84 85type indexingOpType byte 86 87const ( 88 add indexingOpType = iota 89 remove 90) 91 92type indexingOp struct { 93 fingerprint model.Fingerprint 94 metric model.Metric 95 opType indexingOpType 96} 97 98// A Persistence is used by a Storage implementation to store samples 99// persistently across restarts. The methods are only goroutine-safe if 100// explicitly marked as such below. The chunk-related methods persistChunks, 101// dropChunks, loadChunks, and loadChunkDescs can be called concurrently with 102// each other if each call refers to a different fingerprint. 103type persistence struct { 104 basePath string 105 106 archivedFingerprintToMetrics *index.FingerprintMetricIndex 107 archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex 108 labelPairToFingerprints *index.LabelPairFingerprintIndex 109 labelNameToLabelValues *index.LabelNameLabelValuesIndex 110 111 indexingQueue chan indexingOp 112 indexingStopped chan struct{} 113 indexingFlush chan chan int 114 115 indexingQueueLength prometheus.Gauge 116 indexingQueueCapacity prometheus.Metric 117 indexingBatchSizes prometheus.Summary 118 indexingBatchDuration prometheus.Summary 119 checkpointDuration prometheus.Summary 120 checkpointLastDuration prometheus.Gauge 121 checkpointLastSize prometheus.Gauge 122 checkpointChunksWritten prometheus.Summary 123 dirtyCounter prometheus.Counter 124 startedDirty prometheus.Gauge 125 checkpointing prometheus.Gauge 126 seriesChunksPersisted prometheus.Histogram 127 128 dirtyMtx sync.Mutex // Protects dirty and becameDirty. 129 dirty bool // true if persistence was started in dirty state. 130 becameDirty bool // true if an inconsistency came up during runtime. 131 pedanticChecks bool // true if crash recovery should check each series. 132 dirtyFileName string // The file used for locking and to mark dirty state. 133 fLock flock.Releaser // The file lock to protect against concurrent usage. 134 135 shouldSync syncStrategy 136 137 minShrinkRatio float64 // How much a series file has to shrink to justify dropping chunks. 138 139 bufPool sync.Pool 140} 141 142// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. 143func newPersistence( 144 basePath string, 145 dirty, pedanticChecks bool, 146 shouldSync syncStrategy, 147 minShrinkRatio float64, 148) (*persistence, error) { 149 dirtyPath := filepath.Join(basePath, dirtyFileName) 150 versionPath := filepath.Join(basePath, versionFileName) 151 152 if versionData, err := ioutil.ReadFile(versionPath); err == nil { 153 if persistedVersion, err := strconv.Atoi(strings.TrimSpace(string(versionData))); err != nil { 154 return nil, fmt.Errorf("cannot parse content of %s: %s", versionPath, versionData) 155 } else if persistedVersion != Version { 156 return nil, fmt.Errorf("found storage version %d on disk, need version %d - please wipe storage or run a version of Prometheus compatible with storage version %d", persistedVersion, Version, persistedVersion) 157 } 158 } else if os.IsNotExist(err) { 159 // No version file found. Let's create the directory (in case 160 // it's not there yet) and then check if it is actually 161 // empty. If not, we have found an old storage directory without 162 // version file, so we have to bail out. 163 if err := os.MkdirAll(basePath, 0700); err != nil { 164 if abspath, e := filepath.Abs(basePath); e == nil { 165 return nil, fmt.Errorf("cannot create persistent directory %s: %s", abspath, err) 166 } 167 return nil, fmt.Errorf("cannot create persistent directory %s: %s", basePath, err) 168 } 169 fis, err := ioutil.ReadDir(basePath) 170 if err != nil { 171 return nil, err 172 } 173 filesPresent := len(fis) 174 for i := range fis { 175 switch { 176 case fis[i].Name() == "lost+found" && fis[i].IsDir(): 177 filesPresent-- 178 case strings.HasPrefix(fis[i].Name(), "."): 179 filesPresent-- 180 } 181 } 182 if filesPresent > 0 { 183 return nil, fmt.Errorf("found existing files in storage path that do not look like storage files compatible with this version of Prometheus; please delete the files in the storage path or choose a different storage path") 184 } 185 // Finally we can write our own version into a new version file. 186 file, err := os.Create(versionPath) 187 if err != nil { 188 return nil, err 189 } 190 defer file.Close() 191 if _, err := fmt.Fprintf(file, "%d\n", Version); err != nil { 192 return nil, err 193 } 194 } else { 195 return nil, err 196 } 197 198 fLock, dirtyfileExisted, err := flock.New(dirtyPath) 199 if err != nil { 200 log.Errorf("Could not lock %s, Prometheus already running?", dirtyPath) 201 return nil, err 202 } 203 if dirtyfileExisted { 204 dirty = true 205 } 206 207 archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath) 208 if err != nil { 209 // At this point, we could simply blow away the archived 210 // fingerprint-to-metric index. However, then we would lose 211 // _all_ archived metrics. So better give the user an 212 // opportunity to repair the LevelDB with a 3rd party tool. 213 log.Errorf("Could not open the fingerprint-to-metric index for archived series. Please try a 3rd party tool to repair LevelDB in directory %q. If unsuccessful or undesired, delete the whole directory and restart Prometheus for crash recovery. You will lose all archived time series.", filepath.Join(basePath, index.FingerprintToMetricDir)) 214 return nil, err 215 } 216 archivedFingerprintToTimeRange, err := index.NewFingerprintTimeRangeIndex(basePath) 217 if err != nil { 218 // We can recover the archived fingerprint-to-timerange index, 219 // so blow it away and set ourselves dirty. Then re-open the now 220 // empty index. 221 if err := index.DeleteFingerprintTimeRangeIndex(basePath); err != nil { 222 return nil, err 223 } 224 dirty = true 225 if archivedFingerprintToTimeRange, err = index.NewFingerprintTimeRangeIndex(basePath); err != nil { 226 return nil, err 227 } 228 } 229 230 p := &persistence{ 231 basePath: basePath, 232 233 archivedFingerprintToMetrics: archivedFingerprintToMetrics, 234 archivedFingerprintToTimeRange: archivedFingerprintToTimeRange, 235 236 indexingQueue: make(chan indexingOp, indexingQueueCapacity), 237 indexingStopped: make(chan struct{}), 238 indexingFlush: make(chan chan int), 239 240 indexingQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ 241 Namespace: namespace, 242 Subsystem: subsystem, 243 Name: "indexing_queue_length", 244 Help: "The number of metrics waiting to be indexed.", 245 }), 246 indexingQueueCapacity: prometheus.MustNewConstMetric( 247 prometheus.NewDesc( 248 prometheus.BuildFQName(namespace, subsystem, "indexing_queue_capacity"), 249 "The capacity of the indexing queue.", 250 nil, nil, 251 ), 252 prometheus.GaugeValue, 253 float64(indexingQueueCapacity), 254 ), 255 indexingBatchSizes: prometheus.NewSummary( 256 prometheus.SummaryOpts{ 257 Namespace: namespace, 258 Subsystem: subsystem, 259 Name: "indexing_batch_sizes", 260 Help: "Quantiles for indexing batch sizes (number of metrics per batch).", 261 }, 262 ), 263 indexingBatchDuration: prometheus.NewSummary( 264 prometheus.SummaryOpts{ 265 Namespace: namespace, 266 Subsystem: subsystem, 267 Name: "indexing_batch_duration_seconds", 268 Help: "Quantiles for batch indexing duration in seconds.", 269 }, 270 ), 271 checkpointLastDuration: prometheus.NewGauge(prometheus.GaugeOpts{ 272 Namespace: namespace, 273 Subsystem: subsystem, 274 Name: "checkpoint_last_duration_seconds", 275 Help: "The duration in seconds it took to last checkpoint open chunks and chunks yet to be persisted.", 276 }), 277 checkpointDuration: prometheus.NewSummary(prometheus.SummaryOpts{ 278 Namespace: namespace, 279 Subsystem: subsystem, 280 Objectives: map[float64]float64{}, 281 Name: "checkpoint_duration_seconds", 282 Help: "The duration in seconds taken for checkpointing open chunks and chunks yet to be persisted", 283 }), 284 checkpointLastSize: prometheus.NewGauge(prometheus.GaugeOpts{ 285 Namespace: namespace, 286 Subsystem: subsystem, 287 Name: "checkpoint_last_size_bytes", 288 Help: "The size of the last checkpoint of open chunks and chunks yet to be persisted", 289 }), 290 checkpointChunksWritten: prometheus.NewSummary(prometheus.SummaryOpts{ 291 Namespace: namespace, 292 Subsystem: subsystem, 293 Objectives: map[float64]float64{}, 294 Name: "checkpoint_series_chunks_written", 295 Help: "The number of chunk written per series while checkpointing open chunks and chunks yet to be persisted.", 296 }), 297 dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{ 298 Namespace: namespace, 299 Subsystem: subsystem, 300 Name: "inconsistencies_total", 301 Help: "A counter incremented each time an inconsistency in the local storage is detected. If this is greater zero, restart the server as soon as possible.", 302 }), 303 startedDirty: prometheus.NewGauge(prometheus.GaugeOpts{ 304 Namespace: namespace, 305 Subsystem: subsystem, 306 Name: "started_dirty", 307 Help: "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup.", 308 }), 309 checkpointing: prometheus.NewGauge(prometheus.GaugeOpts{ 310 Namespace: namespace, 311 Subsystem: subsystem, 312 Name: "checkpointing", 313 Help: "1 if the storage is checkpointing, 0 otherwise.", 314 }), 315 seriesChunksPersisted: prometheus.NewHistogram(prometheus.HistogramOpts{ 316 Namespace: namespace, 317 Subsystem: subsystem, 318 Name: "series_chunks_persisted", 319 Help: "The number of chunks persisted per series.", 320 // Even with 4 bytes per sample, you're not going to get more than 85 321 // chunks in 6 hours for a time series with 1s resolution. 322 Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128}, 323 }), 324 dirty: dirty, 325 pedanticChecks: pedanticChecks, 326 dirtyFileName: dirtyPath, 327 fLock: fLock, 328 shouldSync: shouldSync, 329 minShrinkRatio: minShrinkRatio, 330 // Create buffers of length 3*chunkLenWithHeader by default because that is still reasonably small 331 // and at the same time enough for many uses. The contract is to never return buffer smaller than 332 // that to the pool so that callers can rely on a minimum buffer size. 333 bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }}, 334 } 335 336 if p.dirty { 337 // Blow away the label indexes. We'll rebuild them later. 338 if err := index.DeleteLabelPairFingerprintIndex(basePath); err != nil { 339 return nil, err 340 } 341 if err := index.DeleteLabelNameLabelValuesIndex(basePath); err != nil { 342 return nil, err 343 } 344 } 345 labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath) 346 if err != nil { 347 return nil, err 348 } 349 labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath) 350 if err != nil { 351 return nil, err 352 } 353 p.labelPairToFingerprints = labelPairToFingerprints 354 p.labelNameToLabelValues = labelNameToLabelValues 355 356 return p, nil 357} 358 359func (p *persistence) run() { 360 p.processIndexingQueue() 361} 362 363// Describe implements prometheus.Collector. 364func (p *persistence) Describe(ch chan<- *prometheus.Desc) { 365 ch <- p.indexingQueueLength.Desc() 366 ch <- p.indexingQueueCapacity.Desc() 367 p.indexingBatchSizes.Describe(ch) 368 p.indexingBatchDuration.Describe(ch) 369 ch <- p.checkpointDuration.Desc() 370 ch <- p.checkpointLastDuration.Desc() 371 ch <- p.checkpointLastSize.Desc() 372 ch <- p.checkpointChunksWritten.Desc() 373 ch <- p.checkpointing.Desc() 374 ch <- p.dirtyCounter.Desc() 375 ch <- p.startedDirty.Desc() 376 ch <- p.seriesChunksPersisted.Desc() 377} 378 379// Collect implements prometheus.Collector. 380func (p *persistence) Collect(ch chan<- prometheus.Metric) { 381 p.indexingQueueLength.Set(float64(len(p.indexingQueue))) 382 383 ch <- p.indexingQueueLength 384 ch <- p.indexingQueueCapacity 385 p.indexingBatchSizes.Collect(ch) 386 p.indexingBatchDuration.Collect(ch) 387 ch <- p.checkpointDuration 388 ch <- p.checkpointLastDuration 389 ch <- p.checkpointLastSize 390 ch <- p.checkpointChunksWritten 391 ch <- p.checkpointing 392 ch <- p.dirtyCounter 393 ch <- p.startedDirty 394 ch <- p.seriesChunksPersisted 395} 396 397// isDirty returns the dirty flag in a goroutine-safe way. 398func (p *persistence) isDirty() bool { 399 p.dirtyMtx.Lock() 400 defer p.dirtyMtx.Unlock() 401 return p.dirty 402} 403 404// setDirty flags the storage as dirty in a goroutine-safe way. The provided 405// error will be logged as a reason the first time the storage is flagged as dirty. 406func (p *persistence) setDirty(err error) { 407 p.dirtyCounter.Inc() 408 p.dirtyMtx.Lock() 409 defer p.dirtyMtx.Unlock() 410 if p.becameDirty { 411 return 412 } 413 p.dirty = true 414 p.becameDirty = true 415 log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") 416} 417 418// fingerprintsForLabelPair returns the fingerprints for the given label 419// pair. This method is goroutine-safe but take into account that metrics queued 420// for indexing with IndexMetric might not have made it into the index 421// yet. (Same applies correspondingly to UnindexMetric.) 422func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints { 423 fps, _, err := p.labelPairToFingerprints.Lookup(lp) 424 if err != nil { 425 p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err)) 426 return nil 427 } 428 return fps 429} 430 431// labelValuesForLabelName returns the label values for the given label 432// name. This method is goroutine-safe but take into account that metrics queued 433// for indexing with IndexMetric might not have made it into the index 434// yet. (Same applies correspondingly to UnindexMetric.) 435func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) { 436 lvs, _, err := p.labelNameToLabelValues.Lookup(ln) 437 if err != nil { 438 p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err)) 439 return nil, err 440 } 441 return lvs, nil 442} 443 444// persistChunks persists a number of consecutive chunks of a series. It is the 445// caller's responsibility to not modify the chunks concurrently and to not 446// persist or drop anything for the same fingerprint concurrently. It returns 447// the (zero-based) index of the first persisted chunk within the series 448// file. In case of an error, the returned index is -1 (to avoid the 449// misconception that the chunk was written at position 0). 450// 451// Returning an error signals problems with the series file. In this case, the 452// caller should quarantine the series. 453func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk.Chunk) (index int, err error) { 454 f, err := p.openChunkFileForWriting(fp) 455 if err != nil { 456 return -1, err 457 } 458 defer p.closeChunkFile(f) 459 460 if err := p.writeChunks(f, chunks); err != nil { 461 return -1, err 462 } 463 464 // Determine index within the file. 465 offset, err := f.Seek(0, io.SeekCurrent) 466 if err != nil { 467 return -1, err 468 } 469 index, err = chunkIndexForOffset(offset) 470 if err != nil { 471 return -1, err 472 } 473 474 return index - len(chunks), err 475} 476 477// loadChunks loads a group of chunks of a timeseries by their index. The chunk 478// with the earliest time will have index 0, the following ones will have 479// incrementally larger indexes. The indexOffset denotes the offset to be added to 480// each index in indexes. It is the caller's responsibility to not persist or 481// drop anything for the same fingerprint concurrently. 482func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) { 483 f, err := p.openChunkFileForReading(fp) 484 if err != nil { 485 return nil, err 486 } 487 defer f.Close() 488 489 chunks := make([]chunk.Chunk, 0, len(indexes)) 490 buf := p.bufPool.Get().([]byte) 491 defer func() { 492 // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' 493 // would only put back the original buf. 494 p.bufPool.Put(buf) 495 }() 496 497 for i := 0; i < len(indexes); i++ { 498 // This loads chunks in batches. A batch is a streak of 499 // consecutive chunks, read from disk in one go. 500 batchSize := 1 501 if _, err := f.Seek(offsetForChunkIndex(indexes[i]+indexOffset), io.SeekStart); err != nil { 502 return nil, err 503 } 504 505 for ; batchSize < chunkMaxBatchSize && 506 i+1 < len(indexes) && 507 indexes[i]+1 == indexes[i+1]; i, batchSize = i+1, batchSize+1 { 508 } 509 readSize := batchSize * chunkLenWithHeader 510 if cap(buf) < readSize { 511 buf = make([]byte, readSize) 512 } 513 buf = buf[:readSize] 514 515 if _, err := io.ReadFull(f, buf); err != nil { 516 return nil, err 517 } 518 for c := 0; c < batchSize; c++ { 519 chunk, err := chunk.NewForEncoding(chunk.Encoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) 520 if err != nil { 521 return nil, err 522 } 523 if err := chunk.UnmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil { 524 return nil, err 525 } 526 chunks = append(chunks, chunk) 527 } 528 } 529 chunk.Ops.WithLabelValues(chunk.Load).Add(float64(len(chunks))) 530 atomic.AddInt64(&chunk.NumMemChunks, int64(len(chunks))) 531 return chunks, nil 532} 533 534// loadChunkDescs loads the chunk.Descs for a series from disk. offsetFromEnd is 535// the number of chunk.Descs to skip from the end of the series file. It is the 536// caller's responsibility to not persist or drop anything for the same 537// fingerprint concurrently. 538func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunk.Desc, error) { 539 f, err := p.openChunkFileForReading(fp) 540 if os.IsNotExist(err) { 541 return nil, nil 542 } 543 if err != nil { 544 return nil, err 545 } 546 defer f.Close() 547 548 fi, err := f.Stat() 549 if err != nil { 550 return nil, err 551 } 552 if fi.Size()%int64(chunkLenWithHeader) != 0 { 553 // The returned error will bubble up and lead to quarantining of the whole series. 554 return nil, fmt.Errorf( 555 "size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", 556 fp, fi.Size(), chunkLenWithHeader, 557 ) 558 } 559 560 numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd 561 cds := make([]*chunk.Desc, numChunks) 562 chunkTimesBuf := make([]byte, 16) 563 for i := 0; i < numChunks; i++ { 564 _, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, io.SeekStart) 565 if err != nil { 566 return nil, err 567 } 568 569 _, err = io.ReadAtLeast(f, chunkTimesBuf, 16) 570 if err != nil { 571 return nil, err 572 } 573 cds[i] = &chunk.Desc{ 574 ChunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)), 575 ChunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), 576 } 577 } 578 chunk.DescOps.WithLabelValues(chunk.Load).Add(float64(len(cds))) 579 chunk.NumMemDescs.Add(float64(len(cds))) 580 return cds, nil 581} 582 583// checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping 584// and all non persisted chunks. Do not call concurrently with 585// loadSeriesMapAndHeads. This method will only write heads format v2, but 586// loadSeriesMapAndHeads can also understand v1. 587// 588// Description of the file format (for both, v1 and v2): 589// 590// (1) Magic string (const headsMagicString). 591// 592// (2) Varint-encoded format version (const headsFormatVersion). 593// 594// (3) Number of series in checkpoint as big-endian uint64. 595// 596// (4) Repeated once per series: 597// 598// (4.1) A flag byte, see flag constants above. (Present but unused in v2.) 599// 600// (4.2) The fingerprint as big-endian uint64. 601// 602// (4.3) The metric as defined by codable.Metric. 603// 604// (4.4) The varint-encoded persistWatermark. (Missing in v1.) 605// 606// (4.5) The modification time of the series file as nanoseconds elapsed since 607// January 1, 1970 UTC. -1 if the modification time is unknown or no series file 608// exists yet. (Missing in v1.) 609// 610// (4.6) The varint-encoded chunkDescsOffset. 611// 612// (4.6) The varint-encoded savedFirstTime. 613// 614// (4.7) The varint-encoded number of chunk descriptors. 615// 616// (4.8) Repeated once per chunk descriptor, oldest to most recent, either 617// variant 4.8.1 (if index < persistWatermark) or variant 4.8.2 (if index >= 618// persistWatermark). In v1, everything is variant 4.8.1 except for a 619// non-persisted head-chunk (determined by the flags). 620// 621// (4.8.1.1) The varint-encoded first time. 622// (4.8.1.2) The varint-encoded last time. 623// 624// (4.8.2.1) A byte defining the chunk type. 625// (4.8.2.2) The chunk itself, marshaled with the Marshal() method. 626// 627// NOTE: Above, varint encoding is used consistently although uvarint would have 628// made more sense in many cases. This was simply a glitch while designing the 629// format. 630func (p *persistence) checkpointSeriesMapAndHeads( 631 ctx context.Context, fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker, 632) (err error) { 633 log.Info("Checkpointing in-memory metrics and chunks...") 634 p.checkpointing.Set(1) 635 defer p.checkpointing.Set(0) 636 begin := time.Now() 637 f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) 638 if err != nil { 639 return err 640 } 641 642 defer func() { 643 defer os.Remove(p.headsTempFileName()) // Just in case it was left behind. 644 645 if err != nil { 646 // If we already had an error, do not bother to sync, 647 // just close, ignoring any further error. 648 f.Close() 649 return 650 } 651 syncErr := f.Sync() 652 closeErr := f.Close() 653 err = syncErr 654 if err != nil { 655 return 656 } 657 err = closeErr 658 if err != nil { 659 return 660 } 661 err = os.Rename(p.headsTempFileName(), p.headsFileName()) 662 duration := time.Since(begin) 663 p.checkpointDuration.Observe(duration.Seconds()) 664 p.checkpointLastDuration.Set(duration.Seconds()) 665 log.Infof("Done checkpointing in-memory metrics and chunks in %v.", duration) 666 }() 667 668 w := bufio.NewWriterSize(f, fileBufSize) 669 670 if _, err = w.WriteString(headsMagicString); err != nil { 671 return err 672 } 673 var numberOfSeriesOffset int 674 if numberOfSeriesOffset, err = codable.EncodeVarint(w, headsFormatVersion); err != nil { 675 return err 676 } 677 numberOfSeriesOffset += len(headsMagicString) 678 numberOfSeriesInHeader := uint64(fingerprintToSeries.length()) 679 // We have to write the number of series as uint64 because we might need 680 // to overwrite it later, and a varint might change byte width then. 681 if err = codable.EncodeUint64(w, numberOfSeriesInHeader); err != nil { 682 return err 683 } 684 685 iter := fingerprintToSeries.iter() 686 defer func() { 687 // Consume the iterator in any case to not leak goroutines. 688 for range iter { 689 } 690 }() 691 692 var realNumberOfSeries uint64 693 for m := range iter { 694 select { 695 case <-ctx.Done(): 696 return ctx.Err() 697 default: 698 } 699 func() { // Wrapped in function to use defer for unlocking the fp. 700 fpLocker.Lock(m.fp) 701 defer fpLocker.Unlock(m.fp) 702 703 chunksToPersist := len(m.series.chunkDescs) - m.series.persistWatermark 704 if len(m.series.chunkDescs) == 0 { 705 // This series was completely purged or archived 706 // in the meantime. Ignore. 707 return 708 } 709 realNumberOfSeries++ 710 711 // Sanity checks. 712 if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 { 713 panic("encountered unknown chunk desc offset in combination with positive persist watermark") 714 } 715 716 // These are the values to save in the normal case. 717 var ( 718 // persistWatermark is zero as we only checkpoint non-persisted chunks. 719 persistWatermark int64 720 // chunkDescsOffset is shifted by the original persistWatermark for the same reason. 721 chunkDescsOffset = int64(m.series.chunkDescsOffset + m.series.persistWatermark) 722 numChunkDescs = int64(chunksToPersist) 723 ) 724 // However, in the special case of a series being fully 725 // persisted but still in memory (i.e. not archived), we 726 // need to save a "placeholder", for which we use just 727 // the chunk desc of the last chunk. Values have to be 728 // adjusted accordingly. (The reason for doing it in 729 // this weird way is to keep the checkpoint format 730 // compatible with older versions.) 731 if chunksToPersist == 0 { 732 persistWatermark = 1 733 chunkDescsOffset-- // Save one chunk desc after all. 734 numChunkDescs = 1 735 } 736 737 // seriesFlags left empty in v2. 738 if err = w.WriteByte(0); err != nil { 739 return 740 } 741 if err = codable.EncodeUint64(w, uint64(m.fp)); err != nil { 742 return 743 } 744 var buf []byte 745 buf, err = codable.Metric(m.series.metric).MarshalBinary() 746 if err != nil { 747 return 748 } 749 if _, err = w.Write(buf); err != nil { 750 return 751 } 752 if _, err = codable.EncodeVarint(w, persistWatermark); err != nil { 753 return 754 } 755 if m.series.modTime.IsZero() { 756 if _, err = codable.EncodeVarint(w, -1); err != nil { 757 return 758 } 759 } else { 760 if _, err = codable.EncodeVarint(w, m.series.modTime.UnixNano()); err != nil { 761 return 762 } 763 } 764 if _, err = codable.EncodeVarint(w, chunkDescsOffset); err != nil { 765 return 766 } 767 if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil { 768 return 769 } 770 if _, err = codable.EncodeVarint(w, numChunkDescs); err != nil { 771 return 772 } 773 if chunksToPersist == 0 { 774 // Save the one placeholder chunk desc for a fully persisted series. 775 chunkDesc := m.series.chunkDescs[len(m.series.chunkDescs)-1] 776 if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil { 777 return 778 } 779 lt, err := chunkDesc.LastTime() 780 if err != nil { 781 return 782 } 783 if _, err = codable.EncodeVarint(w, int64(lt)); err != nil { 784 return 785 } 786 } else { 787 // Save (only) the non-persisted chunks. 788 for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] { 789 if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil { 790 return 791 } 792 if err = chunkDesc.C.Marshal(w); err != nil { 793 return 794 } 795 p.checkpointChunksWritten.Observe(float64(chunksToPersist)) 796 } 797 } 798 // Series is checkpointed now, so declare it clean. In case the entire 799 // checkpoint fails later on, this is fine, as the storage's series 800 // maintenance will mark these series newly dirty again, continuously 801 // increasing the total number of dirty series as seen by the storage. 802 // This has the effect of triggering a new checkpoint attempt even 803 // earlier than if we hadn't incorrectly set "dirty" to "false" here 804 // already. 805 m.series.dirty = false 806 }() 807 if err != nil { 808 return err 809 } 810 } 811 if err = w.Flush(); err != nil { 812 return err 813 } 814 if realNumberOfSeries != numberOfSeriesInHeader { 815 // The number of series has changed in the meantime. 816 // Rewrite it in the header. 817 if _, err = f.Seek(int64(numberOfSeriesOffset), io.SeekStart); err != nil { 818 return err 819 } 820 if err = codable.EncodeUint64(f, realNumberOfSeries); err != nil { 821 return err 822 } 823 } 824 info, err := f.Stat() 825 if err != nil { 826 return err 827 } 828 p.checkpointLastSize.Set(float64(info.Size())) 829 return err 830} 831 832// loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all 833// the chunks contained in the checkpoint (and thus not yet persisted to series 834// files). The method is capable of loading the checkpoint format v1 and v2. If 835// recoverable corruption is detected, or if the dirty flag was set from the 836// beginning, crash recovery is run, which might take a while. If an 837// unrecoverable error is encountered, it is returned. Call this method during 838// start-up while nothing else is running in storage land. This method is 839// utterly goroutine-unsafe. 840func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist int64, err error) { 841 fingerprintToSeries := make(map[model.Fingerprint]*memorySeries) 842 sm = &seriesMap{m: fingerprintToSeries} 843 844 defer func() { 845 if p.dirty { 846 log.Warn("Persistence layer appears dirty.") 847 p.startedDirty.Set(1) 848 err = p.recoverFromCrash(fingerprintToSeries) 849 if err != nil { 850 sm = nil 851 } 852 } else { 853 p.startedDirty.Set(0) 854 } 855 }() 856 857 hs := newHeadsScanner(p.headsFileName()) 858 defer hs.close() 859 for hs.scan() { 860 fingerprintToSeries[hs.fp] = hs.series 861 } 862 if os.IsNotExist(hs.err) { 863 return sm, 0, nil 864 } 865 if hs.err != nil { 866 p.dirty = true 867 log. 868 With("file", p.headsFileName()). 869 With("error", hs.err). 870 Error("Error reading heads file.") 871 return sm, 0, hs.err 872 } 873 return sm, hs.chunksToPersistTotal, nil 874} 875 876// dropAndPersistChunks deletes all chunks from a series file whose last sample 877// time is before beforeTime, and then appends the provided chunks, leaving out 878// those whose last sample time is before beforeTime. It returns the timestamp 879// of the first sample in the oldest chunk _not_ dropped, the chunk offset 880// within the series file of the first chunk persisted (out of the provided 881// chunks, or - if no chunks were provided - the chunk offset where chunks would 882// have been persisted, i.e. the end of the file), the number of deleted chunks, 883// and true if all chunks of the series have been deleted (in which case the 884// returned timestamp will be 0 and must be ignored). It is the caller's 885// responsibility to make sure nothing is persisted or loaded for the same 886// fingerprint concurrently. 887// 888// Returning an error signals problems with the series file. In this case, the 889// caller should quarantine the series. 890func (p *persistence) dropAndPersistChunks( 891 fp model.Fingerprint, beforeTime model.Time, chunks []chunk.Chunk, 892) ( 893 firstTimeNotDropped model.Time, 894 offset int, 895 numDropped int, 896 allDropped bool, 897 err error, 898) { 899 // Style note: With the many return values, it was decided to use naked 900 // returns in this method. They make the method more readable, but 901 // please handle with care! 902 if len(chunks) > 0 { 903 // We have chunks to persist. First check if those are already 904 // too old. If that's the case, the chunks in the series file 905 // are all too old, too. 906 i := 0 907 for ; i < len(chunks); i++ { 908 var lt model.Time 909 lt, err = chunks[i].NewIterator().LastTimestamp() 910 if err != nil { 911 return 912 } 913 if !lt.Before(beforeTime) { 914 break 915 } 916 } 917 if i < len(chunks) { 918 firstTimeNotDropped = chunks[i].FirstTime() 919 } 920 if i > 0 || firstTimeNotDropped.Before(beforeTime) { 921 // Series file has to go. 922 if numDropped, err = p.deleteSeriesFile(fp); err != nil { 923 return 924 } 925 numDropped += i 926 if i == len(chunks) { 927 allDropped = true 928 return 929 } 930 // Now simply persist what has to be persisted to a new file. 931 _, err = p.persistChunks(fp, chunks[i:]) 932 return 933 } 934 } 935 936 // If we are here, we have to check the series file itself. 937 f, err := p.openChunkFileForReading(fp) 938 if os.IsNotExist(err) { 939 // No series file. Only need to create new file with chunks to 940 // persist, if there are any. 941 if len(chunks) == 0 { 942 allDropped = true 943 err = nil // Do not report not-exist err. 944 return 945 } 946 offset, err = p.persistChunks(fp, chunks) 947 return 948 } 949 if err != nil { 950 return 951 } 952 defer f.Close() 953 954 fi, err := f.Stat() 955 if err != nil { 956 return 957 } 958 chunksInFile := int(fi.Size()) / chunkLenWithHeader 959 totalChunks := chunksInFile + len(chunks) 960 961 // Calculate chunk index from minShrinkRatio, to skip unnecessary chunk header reading. 962 chunkIndexToStartSeek := 0 963 if p.minShrinkRatio < 1 { 964 chunkIndexToStartSeek = int(math.Floor(float64(totalChunks) * p.minShrinkRatio)) 965 } 966 if chunkIndexToStartSeek >= chunksInFile { 967 chunkIndexToStartSeek = chunksInFile - 1 968 } 969 numDropped = chunkIndexToStartSeek 970 971 headerBuf := make([]byte, chunkHeaderLen) 972 // Find the first chunk in the file that should be kept. 973 for ; ; numDropped++ { 974 _, err = f.Seek(offsetForChunkIndex(numDropped), io.SeekStart) 975 if err != nil { 976 return 977 } 978 _, err = io.ReadFull(f, headerBuf) 979 if err == io.EOF { 980 // Close the file before trying to delete it. This is necessary on Windows 981 // (this will cause the defer f.Close to fail, but the error is silently ignored) 982 f.Close() 983 // We ran into the end of the file without finding any chunks that should 984 // be kept. Remove the whole file. 985 if numDropped, err = p.deleteSeriesFile(fp); err != nil { 986 return 987 } 988 if len(chunks) == 0 { 989 allDropped = true 990 return 991 } 992 offset, err = p.persistChunks(fp, chunks) 993 return 994 } 995 if err != nil { 996 return 997 } 998 lastTime := model.Time( 999 binary.LittleEndian.Uint64(headerBuf[chunkHeaderLastTimeOffset:]), 1000 ) 1001 if !lastTime.Before(beforeTime) { 1002 break 1003 } 1004 } 1005 1006 // If numDropped isn't incremented, the minShrinkRatio condition isn't satisfied. 1007 if numDropped == chunkIndexToStartSeek { 1008 // Nothing to drop. Just adjust the return values and append the chunks (if any). 1009 numDropped = 0 1010 _, err = f.Seek(offsetForChunkIndex(0), io.SeekStart) 1011 if err != nil { 1012 return 1013 } 1014 _, err = io.ReadFull(f, headerBuf) 1015 if err != nil { 1016 return 1017 } 1018 firstTimeNotDropped = model.Time( 1019 binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]), 1020 ) 1021 if len(chunks) > 0 { 1022 offset, err = p.persistChunks(fp, chunks) 1023 } else { 1024 offset = chunksInFile 1025 } 1026 return 1027 } 1028 // If we are here, we have to drop some chunks for real. So we need to 1029 // record firstTimeNotDropped from the last read header, seek backwards 1030 // to the beginning of its header, and start copying everything from 1031 // there into a new file. Then append the chunks to the new file. 1032 firstTimeNotDropped = model.Time( 1033 binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]), 1034 ) 1035 chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numDropped)) 1036 _, err = f.Seek(-chunkHeaderLen, io.SeekCurrent) 1037 if err != nil { 1038 return 1039 } 1040 1041 temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640) 1042 if err != nil { 1043 return 1044 } 1045 defer func() { 1046 // Close the file before trying to rename to it. This is necessary on Windows 1047 // (this will cause the defer f.Close to fail, but the error is silently ignored) 1048 f.Close() 1049 p.closeChunkFile(temp) 1050 if err == nil { 1051 err = os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp)) 1052 } 1053 }() 1054 1055 written, err := io.Copy(temp, f) 1056 if err != nil { 1057 return 1058 } 1059 offset = int(written / chunkLenWithHeader) 1060 1061 if len(chunks) > 0 { 1062 if err = p.writeChunks(temp, chunks); err != nil { 1063 return 1064 } 1065 } 1066 return 1067} 1068 1069// deleteSeriesFile deletes a series file belonging to the provided 1070// fingerprint. It returns the number of chunks that were contained in the 1071// deleted file. 1072func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) { 1073 fname := p.fileNameForFingerprint(fp) 1074 fi, err := os.Stat(fname) 1075 if os.IsNotExist(err) { 1076 // Great. The file is already gone. 1077 return 0, nil 1078 } 1079 if err != nil { 1080 return -1, err 1081 } 1082 numChunks := int(fi.Size() / chunkLenWithHeader) 1083 if err := os.Remove(fname); err != nil { 1084 return -1, err 1085 } 1086 chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numChunks)) 1087 return numChunks, nil 1088} 1089 1090// quarantineSeriesFile moves a series file to the orphaned directory. It also 1091// writes a hint file with the provided quarantine reason and, if series is 1092// non-nil, the string representation of the metric. 1093func (p *persistence) quarantineSeriesFile(fp model.Fingerprint, quarantineReason error, metric model.Metric) error { 1094 var ( 1095 oldName = p.fileNameForFingerprint(fp) 1096 orphanedDir = filepath.Join(p.basePath, "orphaned", filepath.Base(filepath.Dir(oldName))) 1097 newName = filepath.Join(orphanedDir, filepath.Base(oldName)) 1098 hintName = newName[:len(newName)-len(seriesFileSuffix)] + hintFileSuffix 1099 ) 1100 1101 renameErr := os.MkdirAll(orphanedDir, 0700) 1102 if renameErr != nil { 1103 return renameErr 1104 } 1105 renameErr = os.Rename(oldName, newName) 1106 if os.IsNotExist(renameErr) { 1107 // Source file dosn't exist. That's normal. 1108 renameErr = nil 1109 } 1110 // Write hint file even if the rename ended in an error. At least try... 1111 // And ignore errors writing the hint file. It's best effort. 1112 if f, err := os.Create(hintName); err == nil { 1113 if metric != nil { 1114 f.WriteString(metric.String() + "\n") 1115 } else { 1116 f.WriteString("[UNKNOWN METRIC]\n") 1117 } 1118 if quarantineReason != nil { 1119 f.WriteString(quarantineReason.Error() + "\n") 1120 } else { 1121 f.WriteString("[UNKNOWN REASON]\n") 1122 } 1123 f.Close() 1124 } 1125 return renameErr 1126} 1127 1128// seriesFileModTime returns the modification time of the series file belonging 1129// to the provided fingerprint. In case of an error, the zero value of time.Time 1130// is returned. 1131func (p *persistence) seriesFileModTime(fp model.Fingerprint) time.Time { 1132 var modTime time.Time 1133 if fi, err := os.Stat(p.fileNameForFingerprint(fp)); err == nil { 1134 return fi.ModTime() 1135 } 1136 return modTime 1137} 1138 1139// indexMetric queues the given metric for addition to the indexes needed by 1140// fingerprintsForLabelPair, labelValuesForLabelName, and 1141// fingerprintsModifiedBefore. If the queue is full, this method blocks until 1142// the metric can be queued. This method is goroutine-safe. 1143func (p *persistence) indexMetric(fp model.Fingerprint, m model.Metric) { 1144 p.indexingQueue <- indexingOp{fp, m, add} 1145} 1146 1147// unindexMetric queues references to the given metric for removal from the 1148// indexes used for fingerprintsForLabelPair, labelValuesForLabelName, and 1149// fingerprintsModifiedBefore. The index of fingerprints to archived metrics is 1150// not affected by this removal. (In fact, never call this method for an 1151// archived metric. To purge an archived metric, call purgeArchivedMetric.) 1152// If the queue is full, this method blocks until the metric can be queued. This 1153// method is goroutine-safe. 1154func (p *persistence) unindexMetric(fp model.Fingerprint, m model.Metric) { 1155 p.indexingQueue <- indexingOp{fp, m, remove} 1156} 1157 1158// waitForIndexing waits until all items in the indexing queue are processed. If 1159// queue processing is currently on hold (to gather more ops for batching), this 1160// method will trigger an immediate start of processing. This method is 1161// goroutine-safe. 1162func (p *persistence) waitForIndexing() { 1163 wait := make(chan int) 1164 for { 1165 p.indexingFlush <- wait 1166 if <-wait == 0 { 1167 break 1168 } 1169 } 1170} 1171 1172// archiveMetric persists the mapping of the given fingerprint to the given 1173// metric, together with the first and last timestamp of the series belonging to 1174// the metric. The caller must have locked the fingerprint. 1175func (p *persistence) archiveMetric( 1176 fp model.Fingerprint, m model.Metric, first, last model.Time, 1177) { 1178 if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { 1179 p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s", fp, err)) 1180 return 1181 } 1182 if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { 1183 p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s", fp, err)) 1184 } 1185} 1186 1187// hasArchivedMetric returns whether the archived metric for the given 1188// fingerprint exists and if yes, what the first and last timestamp in the 1189// corresponding series is. This method is goroutine-safe. 1190func (p *persistence) hasArchivedMetric(fp model.Fingerprint) ( 1191 hasMetric bool, firstTime, lastTime model.Time, 1192) { 1193 firstTime, lastTime, hasMetric, err := p.archivedFingerprintToTimeRange.Lookup(fp) 1194 if err != nil { 1195 p.setDirty(fmt.Errorf("error in method hasArchivedMetric(%v): %s", fp, err)) 1196 hasMetric = false 1197 } 1198 return hasMetric, firstTime, lastTime 1199} 1200 1201// updateArchivedTimeRange updates an archived time range. The caller must make 1202// sure that the fingerprint is currently archived (the time range will 1203// otherwise be added without the corresponding metric in the archive). 1204func (p *persistence) updateArchivedTimeRange( 1205 fp model.Fingerprint, first, last model.Time, 1206) error { 1207 return p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}) 1208} 1209 1210// fingerprintsModifiedBefore returns the fingerprints of archived timeseries 1211// that have live samples before the provided timestamp. This method is 1212// goroutine-safe. 1213func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model.Fingerprint, error) { 1214 var fp codable.Fingerprint 1215 var tr codable.TimeRange 1216 fps := []model.Fingerprint{} 1217 err := p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error { 1218 if err := kv.Value(&tr); err != nil { 1219 return err 1220 } 1221 if tr.First.Before(beforeTime) { 1222 if err := kv.Key(&fp); err != nil { 1223 return err 1224 } 1225 fps = append(fps, model.Fingerprint(fp)) 1226 } 1227 return nil 1228 }) 1229 return fps, err 1230} 1231 1232// archivedMetric retrieves the archived metric with the given fingerprint. This 1233// method is goroutine-safe. 1234func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) { 1235 metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) 1236 if err != nil { 1237 p.setDirty(fmt.Errorf("error in method archivedMetric(%v): %s", fp, err)) 1238 return nil, err 1239 } 1240 return metric, nil 1241} 1242 1243// purgeArchivedMetric deletes an archived fingerprint and its corresponding 1244// metric entirely. It also queues the metric for un-indexing (no need to call 1245// unindexMetric for the deleted metric.) It does not touch the series file, 1246// though. The caller must have locked the fingerprint. 1247func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { 1248 defer func() { 1249 if err != nil { 1250 p.setDirty(fmt.Errorf("error in method purgeArchivedMetric(%v): %s", fp, err)) 1251 } 1252 }() 1253 1254 metric, err := p.archivedMetric(fp) 1255 if err != nil || metric == nil { 1256 return err 1257 } 1258 deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)) 1259 if err != nil { 1260 return err 1261 } 1262 if !deleted { 1263 log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToMetrics index. This should never happen.", fp) 1264 } 1265 deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)) 1266 if err != nil { 1267 return err 1268 } 1269 if !deleted { 1270 log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp) 1271 } 1272 p.unindexMetric(fp, metric) 1273 return nil 1274} 1275 1276// unarchiveMetric deletes an archived fingerprint and its metric, but (in 1277// contrast to purgeArchivedMetric) does not un-index the metric. If a metric 1278// was actually deleted, the method returns true and the first time and last 1279// time of the deleted metric. The caller must have locked the fingerprint. 1280func (p *persistence) unarchiveMetric(fp model.Fingerprint) (deletedAnything bool, err error) { 1281 // An error returned here will bubble up and lead to quarantining of the 1282 // series, so no setDirty required. 1283 deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)) 1284 if err != nil || !deleted { 1285 return false, err 1286 } 1287 deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)) 1288 if err != nil { 1289 return false, err 1290 } 1291 if !deleted { 1292 log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp) 1293 } 1294 return true, nil 1295} 1296 1297// close flushes the indexing queue and other buffered data and releases any 1298// held resources. It also removes the dirty marker file if successful and if 1299// the persistence is currently not marked as dirty. 1300func (p *persistence) close() error { 1301 close(p.indexingQueue) 1302 <-p.indexingStopped 1303 1304 var lastError, dirtyFileRemoveError error 1305 if err := p.archivedFingerprintToMetrics.Close(); err != nil { 1306 lastError = err 1307 log.Error("Error closing archivedFingerprintToMetric index DB: ", err) 1308 } 1309 if err := p.archivedFingerprintToTimeRange.Close(); err != nil { 1310 lastError = err 1311 log.Error("Error closing archivedFingerprintToTimeRange index DB: ", err) 1312 } 1313 if err := p.labelPairToFingerprints.Close(); err != nil { 1314 lastError = err 1315 log.Error("Error closing labelPairToFingerprints index DB: ", err) 1316 } 1317 if err := p.labelNameToLabelValues.Close(); err != nil { 1318 lastError = err 1319 log.Error("Error closing labelNameToLabelValues index DB: ", err) 1320 } 1321 if lastError == nil && !p.isDirty() { 1322 dirtyFileRemoveError = os.Remove(p.dirtyFileName) 1323 } 1324 if err := p.fLock.Release(); err != nil { 1325 lastError = err 1326 log.Error("Error releasing file lock: ", err) 1327 } 1328 if dirtyFileRemoveError != nil { 1329 // On Windows, removing the dirty file before unlocking is not 1330 // possible. So remove it here if it failed above. 1331 lastError = os.Remove(p.dirtyFileName) 1332 } 1333 return lastError 1334} 1335 1336func (p *persistence) dirNameForFingerprint(fp model.Fingerprint) string { 1337 fpStr := fp.String() 1338 return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen]) 1339} 1340 1341func (p *persistence) fileNameForFingerprint(fp model.Fingerprint) string { 1342 fpStr := fp.String() 1343 return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) 1344} 1345 1346func (p *persistence) tempFileNameForFingerprint(fp model.Fingerprint) string { 1347 fpStr := fp.String() 1348 return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix) 1349} 1350 1351func (p *persistence) openChunkFileForWriting(fp model.Fingerprint) (*os.File, error) { 1352 if err := os.MkdirAll(p.dirNameForFingerprint(fp), 0700); err != nil { 1353 return nil, err 1354 } 1355 return os.OpenFile(p.fileNameForFingerprint(fp), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) 1356 // NOTE: Although the file was opened for append, 1357 // f.Seek(0, io.SeekCurrent) 1358 // would now return '0, nil', so we cannot check for a consistent file length right now. 1359 // However, the chunkIndexForOffset function is doing that check, so a wrong file length 1360 // would still be detected. 1361} 1362 1363// closeChunkFile first syncs the provided file if mandated so by the sync 1364// strategy. Then it closes the file. Errors are logged. 1365func (p *persistence) closeChunkFile(f *os.File) { 1366 if p.shouldSync() { 1367 if err := f.Sync(); err != nil { 1368 log.Error("Error syncing file:", err) 1369 } 1370 } 1371 if err := f.Close(); err != nil { 1372 log.Error("Error closing chunk file:", err) 1373 } 1374} 1375 1376func (p *persistence) openChunkFileForReading(fp model.Fingerprint) (*os.File, error) { 1377 return os.Open(p.fileNameForFingerprint(fp)) 1378} 1379 1380func (p *persistence) headsFileName() string { 1381 return filepath.Join(p.basePath, headsFileName) 1382} 1383 1384func (p *persistence) headsTempFileName() string { 1385 return filepath.Join(p.basePath, headsTempFileName) 1386} 1387 1388func (p *persistence) mappingsFileName() string { 1389 return filepath.Join(p.basePath, mappingsFileName) 1390} 1391 1392func (p *persistence) mappingsTempFileName() string { 1393 return filepath.Join(p.basePath, mappingsTempFileName) 1394} 1395 1396func (p *persistence) processIndexingQueue() { 1397 batchSize := 0 1398 nameToValues := index.LabelNameLabelValuesMapping{} 1399 pairToFPs := index.LabelPairFingerprintsMapping{} 1400 batchTimeout := time.NewTimer(indexingBatchTimeout) 1401 defer batchTimeout.Stop() 1402 1403 commitBatch := func() { 1404 p.indexingBatchSizes.Observe(float64(batchSize)) 1405 defer func(begin time.Time) { 1406 p.indexingBatchDuration.Observe(time.Since(begin).Seconds()) 1407 }(time.Now()) 1408 1409 if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil { 1410 log.Error("Error indexing label pair to fingerprints batch: ", err) 1411 p.setDirty(err) 1412 } 1413 if err := p.labelNameToLabelValues.IndexBatch(nameToValues); err != nil { 1414 log.Error("Error indexing label name to label values batch: ", err) 1415 p.setDirty(err) 1416 } 1417 batchSize = 0 1418 nameToValues = index.LabelNameLabelValuesMapping{} 1419 pairToFPs = index.LabelPairFingerprintsMapping{} 1420 batchTimeout.Reset(indexingBatchTimeout) 1421 } 1422 1423 var flush chan chan int 1424loop: 1425 for { 1426 // Only process flush requests if the queue is currently empty. 1427 if len(p.indexingQueue) == 0 { 1428 flush = p.indexingFlush 1429 } else { 1430 flush = nil 1431 } 1432 select { 1433 case <-batchTimeout.C: 1434 // Only commit if we have something to commit _and_ 1435 // nothing is waiting in the queue to be picked up. That 1436 // prevents a death spiral if the LookupSet calls below 1437 // are slow for some reason. 1438 if batchSize > 0 && len(p.indexingQueue) == 0 { 1439 commitBatch() 1440 } else { 1441 batchTimeout.Reset(indexingBatchTimeout) 1442 } 1443 case r := <-flush: 1444 if batchSize > 0 { 1445 commitBatch() 1446 } 1447 r <- len(p.indexingQueue) 1448 case op, ok := <-p.indexingQueue: 1449 if !ok { 1450 if batchSize > 0 { 1451 commitBatch() 1452 } 1453 break loop 1454 } 1455 1456 batchSize++ 1457 for ln, lv := range op.metric { 1458 lp := model.LabelPair{Name: ln, Value: lv} 1459 baseFPs, ok := pairToFPs[lp] 1460 if !ok { 1461 var err error 1462 baseFPs, _, err = p.labelPairToFingerprints.LookupSet(lp) 1463 if err != nil { 1464 log.Errorf("Error looking up label pair %v: %s", lp, err) 1465 continue 1466 } 1467 pairToFPs[lp] = baseFPs 1468 } 1469 baseValues, ok := nameToValues[ln] 1470 if !ok { 1471 var err error 1472 baseValues, _, err = p.labelNameToLabelValues.LookupSet(ln) 1473 if err != nil { 1474 log.Errorf("Error looking up label name %v: %s", ln, err) 1475 continue 1476 } 1477 nameToValues[ln] = baseValues 1478 } 1479 switch op.opType { 1480 case add: 1481 baseFPs[op.fingerprint] = struct{}{} 1482 baseValues[lv] = struct{}{} 1483 case remove: 1484 delete(baseFPs, op.fingerprint) 1485 if len(baseFPs) == 0 { 1486 delete(baseValues, lv) 1487 } 1488 default: 1489 panic("unknown op type") 1490 } 1491 } 1492 1493 if batchSize >= indexingMaxBatchSize { 1494 commitBatch() 1495 } 1496 } 1497 } 1498 close(p.indexingStopped) 1499} 1500 1501// checkpointFPMappings persists the fingerprint mappings. The caller has to 1502// ensure that the provided mappings are not changed concurrently. This method 1503// is only called upon shutdown or during crash recovery, when no samples are 1504// ingested. 1505// 1506// Description of the file format, v1: 1507// 1508// (1) Magic string (const mappingsMagicString). 1509// 1510// (2) Uvarint-encoded format version (const mappingsFormatVersion). 1511// 1512// (3) Uvarint-encoded number of mappings in fpMappings. 1513// 1514// (4) Repeated once per mapping: 1515// 1516// (4.1) The raw fingerprint as big-endian uint64. 1517// 1518// (4.2) The uvarint-encoded number of sub-mappings for the raw fingerprint. 1519// 1520// (4.3) Repeated once per sub-mapping: 1521// 1522// (4.3.1) The uvarint-encoded length of the unique metric string. 1523// (4.3.2) The unique metric string. 1524// (4.3.3) The mapped fingerprint as big-endian uint64. 1525func (p *persistence) checkpointFPMappings(fpm fpMappings) (err error) { 1526 log.Info("Checkpointing fingerprint mappings...") 1527 begin := time.Now() 1528 f, err := os.OpenFile(p.mappingsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) 1529 if err != nil { 1530 return 1531 } 1532 1533 defer func() { 1534 syncErr := f.Sync() 1535 closeErr := f.Close() 1536 if err != nil { 1537 return 1538 } 1539 err = syncErr 1540 if err != nil { 1541 return 1542 } 1543 err = closeErr 1544 if err != nil { 1545 return 1546 } 1547 err = os.Rename(p.mappingsTempFileName(), p.mappingsFileName()) 1548 duration := time.Since(begin) 1549 log.Infof("Done checkpointing fingerprint mappings in %v.", duration) 1550 }() 1551 1552 w := bufio.NewWriterSize(f, fileBufSize) 1553 1554 if _, err = w.WriteString(mappingsMagicString); err != nil { 1555 return 1556 } 1557 if _, err = codable.EncodeUvarint(w, mappingsFormatVersion); err != nil { 1558 return 1559 } 1560 if _, err = codable.EncodeUvarint(w, uint64(len(fpm))); err != nil { 1561 return 1562 } 1563 1564 for fp, mappings := range fpm { 1565 if err = codable.EncodeUint64(w, uint64(fp)); err != nil { 1566 return 1567 } 1568 if _, err = codable.EncodeUvarint(w, uint64(len(mappings))); err != nil { 1569 return 1570 } 1571 for ms, mappedFP := range mappings { 1572 if _, err = codable.EncodeUvarint(w, uint64(len(ms))); err != nil { 1573 return 1574 } 1575 if _, err = w.WriteString(ms); err != nil { 1576 return 1577 } 1578 if err = codable.EncodeUint64(w, uint64(mappedFP)); err != nil { 1579 return 1580 } 1581 } 1582 } 1583 err = w.Flush() 1584 return 1585} 1586 1587// loadFPMappings loads the fingerprint mappings. It also returns the highest 1588// mapped fingerprint and any error encountered. If p.mappingsFileName is not 1589// found, the method returns (fpMappings{}, 0, nil). Do not call concurrently 1590// with checkpointFPMappings. 1591func (p *persistence) loadFPMappings() (fpMappings, model.Fingerprint, error) { 1592 fpm := fpMappings{} 1593 var highestMappedFP model.Fingerprint 1594 1595 f, err := os.Open(p.mappingsFileName()) 1596 if os.IsNotExist(err) { 1597 return fpm, 0, nil 1598 } 1599 if err != nil { 1600 return nil, 0, err 1601 } 1602 defer f.Close() 1603 r := bufio.NewReaderSize(f, fileBufSize) 1604 1605 buf := make([]byte, len(mappingsMagicString)) 1606 if _, err := io.ReadFull(r, buf); err != nil { 1607 return nil, 0, err 1608 } 1609 magic := string(buf) 1610 if magic != mappingsMagicString { 1611 return nil, 0, fmt.Errorf( 1612 "unexpected magic string, want %q, got %q", 1613 mappingsMagicString, magic, 1614 ) 1615 } 1616 version, err := binary.ReadUvarint(r) 1617 if version != mappingsFormatVersion || err != nil { 1618 return nil, 0, fmt.Errorf("unknown fingerprint mappings format version, want %d", mappingsFormatVersion) 1619 } 1620 numRawFPs, err := binary.ReadUvarint(r) 1621 if err != nil { 1622 return nil, 0, err 1623 } 1624 for ; numRawFPs > 0; numRawFPs-- { 1625 rawFP, err := codable.DecodeUint64(r) 1626 if err != nil { 1627 return nil, 0, err 1628 } 1629 numMappings, err := binary.ReadUvarint(r) 1630 if err != nil { 1631 return nil, 0, err 1632 } 1633 mappings := make(map[string]model.Fingerprint, numMappings) 1634 for ; numMappings > 0; numMappings-- { 1635 lenMS, err := binary.ReadUvarint(r) 1636 if err != nil { 1637 return nil, 0, err 1638 } 1639 buf := make([]byte, lenMS) 1640 if _, err := io.ReadFull(r, buf); err != nil { 1641 return nil, 0, err 1642 } 1643 fp, err := codable.DecodeUint64(r) 1644 if err != nil { 1645 return nil, 0, err 1646 } 1647 mappedFP := model.Fingerprint(fp) 1648 if mappedFP > highestMappedFP { 1649 highestMappedFP = mappedFP 1650 } 1651 mappings[string(buf)] = mappedFP 1652 } 1653 fpm[model.Fingerprint(rawFP)] = mappings 1654 } 1655 return fpm, highestMappedFP, nil 1656} 1657 1658func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error { 1659 b := p.bufPool.Get().([]byte) 1660 defer func() { 1661 // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' 1662 // would only put back the original buf. 1663 p.bufPool.Put(b) 1664 }() 1665 numChunks := len(chunks) 1666 1667 for batchSize := chunkMaxBatchSize; len(chunks) > 0; chunks = chunks[batchSize:] { 1668 if batchSize > len(chunks) { 1669 batchSize = len(chunks) 1670 } 1671 writeSize := batchSize * chunkLenWithHeader 1672 if cap(b) < writeSize { 1673 b = make([]byte, writeSize) 1674 } 1675 b = b[:writeSize] 1676 1677 for i, chunk := range chunks[:batchSize] { 1678 if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil { 1679 return err 1680 } 1681 if err := chunk.MarshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil { 1682 return err 1683 } 1684 } 1685 if _, err := w.Write(b); err != nil { 1686 return err 1687 } 1688 } 1689 p.seriesChunksPersisted.Observe(float64(numChunks)) 1690 return nil 1691} 1692 1693func offsetForChunkIndex(i int) int64 { 1694 return int64(i * chunkLenWithHeader) 1695} 1696 1697func chunkIndexForOffset(offset int64) (int, error) { 1698 if int(offset)%chunkLenWithHeader != 0 { 1699 return -1, fmt.Errorf( 1700 "offset %d is not a multiple of on-disk chunk length %d", 1701 offset, chunkLenWithHeader, 1702 ) 1703 } 1704 return int(offset) / chunkLenWithHeader, nil 1705} 1706 1707func writeChunkHeader(header []byte, c chunk.Chunk) error { 1708 header[chunkHeaderTypeOffset] = byte(c.Encoding()) 1709 binary.LittleEndian.PutUint64( 1710 header[chunkHeaderFirstTimeOffset:], 1711 uint64(c.FirstTime()), 1712 ) 1713 lt, err := c.NewIterator().LastTimestamp() 1714 if err != nil { 1715 return err 1716 } 1717 binary.LittleEndian.PutUint64( 1718 header[chunkHeaderLastTimeOffset:], 1719 uint64(lt), 1720 ) 1721 return nil 1722} 1723