1// Copyright 2014 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package storage 15 16import ( 17 "encoding/gob" 18 "errors" 19 "fmt" 20 "io/ioutil" 21 "os" 22 "path" 23 "sort" 24 "strings" 25 "sync" 26 "time" 27 28 "github.com/go-kit/log" 29 "github.com/go-kit/log/level" 30 31 //nolint:staticcheck // Ignore SA1019. Dependencies use the deprecated package, so we have to, too. 32 "github.com/golang/protobuf/proto" 33 "github.com/prometheus/client_golang/prometheus" 34 "github.com/prometheus/common/model" 35 36 dto "github.com/prometheus/client_model/go" 37) 38 39const ( 40 pushMetricName = "push_time_seconds" 41 pushMetricHelp = "Last Unix time when changing this group in the Pushgateway succeeded." 42 pushFailedMetricName = "push_failure_time_seconds" 43 pushFailedMetricHelp = "Last Unix time when changing this group in the Pushgateway failed." 44 writeQueueCapacity = 1000 45) 46 47var errTimestamp = errors.New("pushed metrics must not have timestamps") 48 49// DiskMetricStore is an implementation of MetricStore that persists metrics to 50// disk. 51type DiskMetricStore struct { 52 lock sync.RWMutex // Protects metricFamilies. 53 writeQueue chan WriteRequest 54 drain chan struct{} 55 done chan error 56 metricGroups GroupingKeyToMetricGroup 57 persistenceFile string 58 predefinedHelp map[string]string 59 logger log.Logger 60} 61 62type mfStat struct { 63 pos int // Where in the result slice is the MetricFamily? 64 copied bool // Has the MetricFamily already been copied? 65} 66 67// NewDiskMetricStore returns a DiskMetricStore ready to use. To cleanly shut it 68// down and free resources, the Shutdown() method has to be called. 69// 70// If persistenceFile is the empty string, no persisting to disk will 71// happen. Otherwise, a file of that name is used for persisting metrics to 72// disk. If the file already exists, metrics are read from it as part of the 73// start-up. Persisting is happening upon shutdown and after every write action, 74// but the latter will only happen persistenceDuration after the previous 75// persisting. 76// 77// If a non-nil Gatherer is provided, the help strings of metrics gathered by it 78// will be used as standard. Pushed metrics with deviating help strings will be 79// adjusted to avoid inconsistent expositions. 80func NewDiskMetricStore( 81 persistenceFile string, 82 persistenceInterval time.Duration, 83 gatherPredefinedHelpFrom prometheus.Gatherer, 84 logger log.Logger, 85) *DiskMetricStore { 86 // TODO: Do that outside of the constructor to allow the HTTP server to 87 // serve /-/healthy and /-/ready earlier. 88 dms := &DiskMetricStore{ 89 writeQueue: make(chan WriteRequest, writeQueueCapacity), 90 drain: make(chan struct{}), 91 done: make(chan error), 92 metricGroups: GroupingKeyToMetricGroup{}, 93 persistenceFile: persistenceFile, 94 logger: logger, 95 } 96 if err := dms.restore(); err != nil { 97 level.Error(logger).Log("msg", "could not load persisted metrics", "err", err) 98 } 99 if helpStrings, err := extractPredefinedHelpStrings(gatherPredefinedHelpFrom); err == nil { 100 dms.predefinedHelp = helpStrings 101 } else { 102 level.Error(logger).Log("msg", "could not gather metrics for predefined help strings", "err", err) 103 } 104 105 go dms.loop(persistenceInterval) 106 return dms 107} 108 109// SubmitWriteRequest implements the MetricStore interface. 110func (dms *DiskMetricStore) SubmitWriteRequest(req WriteRequest) { 111 dms.writeQueue <- req 112} 113 114// Shutdown implements the MetricStore interface. 115func (dms *DiskMetricStore) Shutdown() error { 116 close(dms.drain) 117 return <-dms.done 118} 119 120// Healthy implements the MetricStore interface. 121func (dms *DiskMetricStore) Healthy() error { 122 // By taking the lock we check that there is no deadlock. 123 dms.lock.Lock() 124 defer dms.lock.Unlock() 125 126 // A pushgateway that cannot be written to should not be 127 // considered as healthy. 128 if len(dms.writeQueue) == cap(dms.writeQueue) { 129 return fmt.Errorf("write queue is full") 130 } 131 132 return nil 133} 134 135// Ready implements the MetricStore interface. 136func (dms *DiskMetricStore) Ready() error { 137 return dms.Healthy() 138} 139 140// GetMetricFamilies implements the MetricStore interface. 141func (dms *DiskMetricStore) GetMetricFamilies() []*dto.MetricFamily { 142 dms.lock.RLock() 143 defer dms.lock.RUnlock() 144 145 result := []*dto.MetricFamily{} 146 mfStatByName := map[string]mfStat{} 147 148 for _, group := range dms.metricGroups { 149 for name, tmf := range group.Metrics { 150 mf := tmf.GetMetricFamily() 151 if mf == nil { 152 level.Warn(dms.logger).Log("msg", "storage corruption detected, consider wiping the persistence file") 153 continue 154 } 155 stat, exists := mfStatByName[name] 156 if exists { 157 existingMF := result[stat.pos] 158 if !stat.copied { 159 mfStatByName[name] = mfStat{ 160 pos: stat.pos, 161 copied: true, 162 } 163 existingMF = copyMetricFamily(existingMF) 164 result[stat.pos] = existingMF 165 } 166 if mf.GetHelp() != existingMF.GetHelp() { 167 level.Info(dms.logger).Log("msg", "metric families inconsistent help strings", "err", "Metric families have inconsistent help strings. The latter will have priority. This is bad. Fix your pushed metrics!", "new", mf, "old", existingMF) 168 } 169 // Type inconsistency cannot be fixed here. We will detect it during 170 // gathering anyway, so no reason to log anything here. 171 existingMF.Metric = append(existingMF.Metric, mf.Metric...) 172 } else { 173 copied := false 174 if help, ok := dms.predefinedHelp[name]; ok && mf.GetHelp() != help { 175 level.Info(dms.logger).Log("msg", "metric families overlap", "err", "Metric family has the same name as a metric family used by the Pushgateway itself but it has a different help string. Changing it to the standard help string. This is bad. Fix your pushed metrics!", "metric_family", mf, "standard_help", help) 176 mf = copyMetricFamily(mf) 177 copied = true 178 mf.Help = proto.String(help) 179 } 180 mfStatByName[name] = mfStat{ 181 pos: len(result), 182 copied: copied, 183 } 184 result = append(result, mf) 185 } 186 } 187 } 188 return result 189} 190 191// GetMetricFamiliesMap implements the MetricStore interface. 192func (dms *DiskMetricStore) GetMetricFamiliesMap() GroupingKeyToMetricGroup { 193 dms.lock.RLock() 194 defer dms.lock.RUnlock() 195 groupsCopy := make(GroupingKeyToMetricGroup, len(dms.metricGroups)) 196 for k, g := range dms.metricGroups { 197 metricsCopy := make(NameToTimestampedMetricFamilyMap, len(g.Metrics)) 198 groupsCopy[k] = MetricGroup{Labels: g.Labels, Metrics: metricsCopy} 199 for n, tmf := range g.Metrics { 200 metricsCopy[n] = tmf 201 } 202 } 203 return groupsCopy 204} 205 206func (dms *DiskMetricStore) loop(persistenceInterval time.Duration) { 207 lastPersist := time.Now() 208 persistScheduled := false 209 lastWrite := time.Time{} 210 persistDone := make(chan time.Time) 211 var persistTimer *time.Timer 212 213 checkPersist := func() { 214 if dms.persistenceFile != "" && !persistScheduled && lastWrite.After(lastPersist) { 215 persistTimer = time.AfterFunc( 216 persistenceInterval-lastWrite.Sub(lastPersist), 217 func() { 218 persistStarted := time.Now() 219 if err := dms.persist(); err != nil { 220 level.Error(dms.logger).Log("msg", "error persisting metrics", "err", err) 221 } else { 222 level.Info(dms.logger).Log("msg", "metrics persisted", "file", dms.persistenceFile) 223 } 224 persistDone <- persistStarted 225 }, 226 ) 227 persistScheduled = true 228 } 229 } 230 231 for { 232 select { 233 case wr := <-dms.writeQueue: 234 lastWrite = time.Now() 235 if dms.checkWriteRequest(wr) { 236 dms.processWriteRequest(wr) 237 } else { 238 dms.setPushFailedTimestamp(wr) 239 } 240 if wr.Done != nil { 241 close(wr.Done) 242 } 243 checkPersist() 244 case lastPersist = <-persistDone: 245 persistScheduled = false 246 checkPersist() // In case something has been written in the meantime. 247 case <-dms.drain: 248 // Prevent a scheduled persist from firing later. 249 if persistTimer != nil { 250 persistTimer.Stop() 251 } 252 // Now draining... 253 for { 254 select { 255 case wr := <-dms.writeQueue: 256 if dms.checkWriteRequest(wr) { 257 dms.processWriteRequest(wr) 258 } else { 259 dms.setPushFailedTimestamp(wr) 260 } 261 default: 262 dms.done <- dms.persist() 263 return 264 } 265 } 266 } 267 } 268} 269 270func (dms *DiskMetricStore) processWriteRequest(wr WriteRequest) { 271 dms.lock.Lock() 272 defer dms.lock.Unlock() 273 274 key := groupingKeyFor(wr.Labels) 275 276 if wr.MetricFamilies == nil { 277 // No MetricFamilies means delete request. Delete the whole 278 // metric group, and we are done here. 279 delete(dms.metricGroups, key) 280 return 281 } 282 // Otherwise, it's an update. 283 group, ok := dms.metricGroups[key] 284 if !ok { 285 group = MetricGroup{ 286 Labels: wr.Labels, 287 Metrics: NameToTimestampedMetricFamilyMap{}, 288 } 289 dms.metricGroups[key] = group 290 } else if wr.Replace { 291 // For replace, we have to delete all metric families in the 292 // group except pre-existing push timestamps. 293 for name := range group.Metrics { 294 if name != pushMetricName && name != pushFailedMetricName { 295 delete(group.Metrics, name) 296 } 297 } 298 } 299 wr.MetricFamilies[pushMetricName] = newPushTimestampGauge(wr.Labels, wr.Timestamp) 300 // Only add a zero push-failed metric if none is there yet, so that a 301 // previously added fail timestamp is retained. 302 if _, ok := group.Metrics[pushFailedMetricName]; !ok { 303 wr.MetricFamilies[pushFailedMetricName] = newPushFailedTimestampGauge(wr.Labels, time.Time{}) 304 } 305 for name, mf := range wr.MetricFamilies { 306 group.Metrics[name] = TimestampedMetricFamily{ 307 Timestamp: wr.Timestamp, 308 GobbableMetricFamily: (*GobbableMetricFamily)(mf), 309 } 310 } 311} 312 313func (dms *DiskMetricStore) setPushFailedTimestamp(wr WriteRequest) { 314 dms.lock.Lock() 315 defer dms.lock.Unlock() 316 317 key := groupingKeyFor(wr.Labels) 318 319 group, ok := dms.metricGroups[key] 320 if !ok { 321 group = MetricGroup{ 322 Labels: wr.Labels, 323 Metrics: NameToTimestampedMetricFamilyMap{}, 324 } 325 dms.metricGroups[key] = group 326 } 327 328 group.Metrics[pushFailedMetricName] = TimestampedMetricFamily{ 329 Timestamp: wr.Timestamp, 330 GobbableMetricFamily: (*GobbableMetricFamily)(newPushFailedTimestampGauge(wr.Labels, wr.Timestamp)), 331 } 332 // Only add a zero push metric if none is there yet, so that a 333 // previously added push timestamp is retained. 334 if _, ok := group.Metrics[pushMetricName]; !ok { 335 group.Metrics[pushMetricName] = TimestampedMetricFamily{ 336 Timestamp: wr.Timestamp, 337 GobbableMetricFamily: (*GobbableMetricFamily)(newPushTimestampGauge(wr.Labels, time.Time{})), 338 } 339 } 340} 341 342// checkWriteRequest return if applying the provided WriteRequest will result in 343// a consistent state of metrics. The dms is not modified by the check. However, 344// the WriteRequest _will_ be sanitized: the MetricFamilies are ensured to 345// contain the grouping Labels after the check. If false is returned, the 346// causing error is written to the Done channel of the WriteRequest. 347// 348// Special case: If the WriteRequest has no Done channel set, the (expensive) 349// consistency check is skipped. The WriteRequest is still sanitized, and the 350// presence of timestamps still results in returning false. 351func (dms *DiskMetricStore) checkWriteRequest(wr WriteRequest) bool { 352 if wr.MetricFamilies == nil { 353 // Delete request cannot create inconsistencies, and nothing has 354 // to be sanitized. 355 return true 356 } 357 358 var err error 359 defer func() { 360 if err != nil && wr.Done != nil { 361 wr.Done <- err 362 } 363 }() 364 365 if timestampsPresent(wr.MetricFamilies) { 366 err = errTimestamp 367 return false 368 } 369 for _, mf := range wr.MetricFamilies { 370 sanitizeLabels(mf, wr.Labels) 371 } 372 373 // Without Done channel, don't do the expensive consistency check. 374 if wr.Done == nil { 375 return true 376 } 377 378 // Construct a test dms, acting on a copy of the metrics, to test the 379 // WriteRequest with. 380 tdms := &DiskMetricStore{ 381 metricGroups: dms.GetMetricFamiliesMap(), 382 predefinedHelp: dms.predefinedHelp, 383 logger: log.NewNopLogger(), 384 } 385 tdms.processWriteRequest(wr) 386 387 // Construct a test Gatherer to check if consistent gathering is possible. 388 tg := prometheus.Gatherers{ 389 prometheus.DefaultGatherer, 390 prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { 391 return tdms.GetMetricFamilies(), nil 392 }), 393 } 394 if _, err = tg.Gather(); err != nil { 395 return false 396 } 397 return true 398} 399 400func (dms *DiskMetricStore) persist() error { 401 // Check (again) if persistence is configured because some code paths 402 // will call this method even if it is not. 403 if dms.persistenceFile == "" { 404 return nil 405 } 406 f, err := ioutil.TempFile( 407 path.Dir(dms.persistenceFile), 408 path.Base(dms.persistenceFile)+".in_progress.", 409 ) 410 if err != nil { 411 return err 412 } 413 inProgressFileName := f.Name() 414 e := gob.NewEncoder(f) 415 416 dms.lock.RLock() 417 err = e.Encode(dms.metricGroups) 418 dms.lock.RUnlock() 419 if err != nil { 420 f.Close() 421 os.Remove(inProgressFileName) 422 return err 423 } 424 if err := f.Close(); err != nil { 425 os.Remove(inProgressFileName) 426 return err 427 } 428 return os.Rename(inProgressFileName, dms.persistenceFile) 429} 430 431func (dms *DiskMetricStore) restore() error { 432 if dms.persistenceFile == "" { 433 return nil 434 } 435 f, err := os.Open(dms.persistenceFile) 436 if os.IsNotExist(err) { 437 return nil 438 } 439 if err != nil { 440 return err 441 } 442 defer f.Close() 443 d := gob.NewDecoder(f) 444 if err := d.Decode(&dms.metricGroups); err != nil { 445 return err 446 } 447 return nil 448} 449 450func copyMetricFamily(mf *dto.MetricFamily) *dto.MetricFamily { 451 return &dto.MetricFamily{ 452 Name: mf.Name, 453 Help: mf.Help, 454 Type: mf.Type, 455 Metric: append([]*dto.Metric{}, mf.Metric...), 456 } 457} 458 459// groupingKeyFor creates a grouping key from the provided map of grouping 460// labels. The grouping key is created by joining all label names and values 461// together with model.SeparatorByte as a separator. The label names are sorted 462// lexicographically before joining. In that way, the grouping key is both 463// reproducible and unique. 464func groupingKeyFor(labels map[string]string) string { 465 if len(labels) == 0 { // Super fast path. 466 return "" 467 } 468 469 labelNames := make([]string, 0, len(labels)) 470 for labelName := range labels { 471 labelNames = append(labelNames, labelName) 472 } 473 sort.Strings(labelNames) 474 475 sb := strings.Builder{} 476 for i, labelName := range labelNames { 477 sb.WriteString(labelName) 478 sb.WriteByte(model.SeparatorByte) 479 sb.WriteString(labels[labelName]) 480 if i+1 < len(labels) { // No separator at the end. 481 sb.WriteByte(model.SeparatorByte) 482 } 483 } 484 return sb.String() 485} 486 487// extractPredefinedHelpStrings extracts all the HELP strings from the provided 488// gatherer so that the DiskMetricStore can fix deviations in pushed metrics. 489func extractPredefinedHelpStrings(g prometheus.Gatherer) (map[string]string, error) { 490 if g == nil { 491 return nil, nil 492 } 493 mfs, err := g.Gather() 494 if err != nil { 495 return nil, err 496 } 497 result := map[string]string{} 498 for _, mf := range mfs { 499 result[mf.GetName()] = mf.GetHelp() 500 } 501 return result, nil 502} 503 504func newPushTimestampGauge(groupingLabels map[string]string, t time.Time) *dto.MetricFamily { 505 return newTimestampGauge(pushMetricName, pushMetricHelp, groupingLabels, t) 506} 507 508func newPushFailedTimestampGauge(groupingLabels map[string]string, t time.Time) *dto.MetricFamily { 509 return newTimestampGauge(pushFailedMetricName, pushFailedMetricHelp, groupingLabels, t) 510} 511 512func newTimestampGauge(name, help string, groupingLabels map[string]string, t time.Time) *dto.MetricFamily { 513 var ts float64 514 if !t.IsZero() { 515 ts = float64(t.UnixNano()) / 1e9 516 } 517 mf := &dto.MetricFamily{ 518 Name: proto.String(name), 519 Help: proto.String(help), 520 Type: dto.MetricType_GAUGE.Enum(), 521 Metric: []*dto.Metric{ 522 { 523 Gauge: &dto.Gauge{ 524 Value: proto.Float64(ts), 525 }, 526 }, 527 }, 528 } 529 sanitizeLabels(mf, groupingLabels) 530 return mf 531} 532 533// sanitizeLabels ensures that all the labels in groupingLabels and the 534// `instance` label are present in the MetricFamily. The label values from 535// groupingLabels are set in each Metric, no matter what. After that, if the 536// 'instance' label is not present at all in a Metric, it will be created (with 537// an empty string as value). 538// 539// Finally, sanitizeLabels sorts the label pairs of all metrics. 540func sanitizeLabels(mf *dto.MetricFamily, groupingLabels map[string]string) { 541 gLabelsNotYetDone := make(map[string]string, len(groupingLabels)) 542 543metric: 544 for _, m := range mf.GetMetric() { 545 for ln, lv := range groupingLabels { 546 gLabelsNotYetDone[ln] = lv 547 } 548 hasInstanceLabel := false 549 for _, lp := range m.GetLabel() { 550 ln := lp.GetName() 551 if lv, ok := gLabelsNotYetDone[ln]; ok { 552 lp.Value = proto.String(lv) 553 delete(gLabelsNotYetDone, ln) 554 } 555 if ln == string(model.InstanceLabel) { 556 hasInstanceLabel = true 557 } 558 if len(gLabelsNotYetDone) == 0 && hasInstanceLabel { 559 sort.Sort(labelPairs(m.Label)) 560 continue metric 561 } 562 } 563 for ln, lv := range gLabelsNotYetDone { 564 m.Label = append(m.Label, &dto.LabelPair{ 565 Name: proto.String(ln), 566 Value: proto.String(lv), 567 }) 568 if ln == string(model.InstanceLabel) { 569 hasInstanceLabel = true 570 } 571 delete(gLabelsNotYetDone, ln) // To prepare map for next metric. 572 } 573 if !hasInstanceLabel { 574 m.Label = append(m.Label, &dto.LabelPair{ 575 Name: proto.String(string(model.InstanceLabel)), 576 Value: proto.String(""), 577 }) 578 } 579 sort.Sort(labelPairs(m.Label)) 580 } 581} 582 583// Checks if any timestamps have been specified. 584func timestampsPresent(metricFamilies map[string]*dto.MetricFamily) bool { 585 for _, mf := range metricFamilies { 586 for _, m := range mf.GetMetric() { 587 if m.TimestampMs != nil { 588 return true 589 } 590 } 591 } 592 return false 593} 594 595// labelPairs implements sort.Interface. It provides a sortable version of a 596// slice of dto.LabelPair pointers. 597type labelPairs []*dto.LabelPair 598 599func (s labelPairs) Len() int { 600 return len(s) 601} 602 603func (s labelPairs) Swap(i, j int) { 604 s[i], s[j] = s[j], s[i] 605} 606 607func (s labelPairs) Less(i, j int) bool { 608 return s[i].GetName() < s[j].GetName() 609} 610