1// Copyright 2014 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package storage
15
16import (
17	"encoding/gob"
18	"errors"
19	"fmt"
20	"io/ioutil"
21	"os"
22	"path"
23	"sort"
24	"strings"
25	"sync"
26	"time"
27
28	"github.com/go-kit/log"
29	"github.com/go-kit/log/level"
30
31	//nolint:staticcheck // Ignore SA1019. Dependencies use the deprecated package, so we have to, too.
32	"github.com/golang/protobuf/proto"
33	"github.com/prometheus/client_golang/prometheus"
34	"github.com/prometheus/common/model"
35
36	dto "github.com/prometheus/client_model/go"
37)
38
39const (
40	pushMetricName       = "push_time_seconds"
41	pushMetricHelp       = "Last Unix time when changing this group in the Pushgateway succeeded."
42	pushFailedMetricName = "push_failure_time_seconds"
43	pushFailedMetricHelp = "Last Unix time when changing this group in the Pushgateway failed."
44	writeQueueCapacity   = 1000
45)
46
47var errTimestamp = errors.New("pushed metrics must not have timestamps")
48
49// DiskMetricStore is an implementation of MetricStore that persists metrics to
50// disk.
51type DiskMetricStore struct {
52	lock            sync.RWMutex // Protects metricFamilies.
53	writeQueue      chan WriteRequest
54	drain           chan struct{}
55	done            chan error
56	metricGroups    GroupingKeyToMetricGroup
57	persistenceFile string
58	predefinedHelp  map[string]string
59	logger          log.Logger
60}
61
62type mfStat struct {
63	pos    int  // Where in the result slice is the MetricFamily?
64	copied bool // Has the MetricFamily already been copied?
65}
66
67// NewDiskMetricStore returns a DiskMetricStore ready to use. To cleanly shut it
68// down and free resources, the Shutdown() method has to be called.
69//
70// If persistenceFile is the empty string, no persisting to disk will
71// happen. Otherwise, a file of that name is used for persisting metrics to
72// disk. If the file already exists, metrics are read from it as part of the
73// start-up. Persisting is happening upon shutdown and after every write action,
74// but the latter will only happen persistenceDuration after the previous
75// persisting.
76//
77// If a non-nil Gatherer is provided, the help strings of metrics gathered by it
78// will be used as standard. Pushed metrics with deviating help strings will be
79// adjusted to avoid inconsistent expositions.
80func NewDiskMetricStore(
81	persistenceFile string,
82	persistenceInterval time.Duration,
83	gatherPredefinedHelpFrom prometheus.Gatherer,
84	logger log.Logger,
85) *DiskMetricStore {
86	// TODO: Do that outside of the constructor to allow the HTTP server to
87	//  serve /-/healthy and /-/ready earlier.
88	dms := &DiskMetricStore{
89		writeQueue:      make(chan WriteRequest, writeQueueCapacity),
90		drain:           make(chan struct{}),
91		done:            make(chan error),
92		metricGroups:    GroupingKeyToMetricGroup{},
93		persistenceFile: persistenceFile,
94		logger:          logger,
95	}
96	if err := dms.restore(); err != nil {
97		level.Error(logger).Log("msg", "could not load persisted metrics", "err", err)
98	}
99	if helpStrings, err := extractPredefinedHelpStrings(gatherPredefinedHelpFrom); err == nil {
100		dms.predefinedHelp = helpStrings
101	} else {
102		level.Error(logger).Log("msg", "could not gather metrics for predefined help strings", "err", err)
103	}
104
105	go dms.loop(persistenceInterval)
106	return dms
107}
108
109// SubmitWriteRequest implements the MetricStore interface.
110func (dms *DiskMetricStore) SubmitWriteRequest(req WriteRequest) {
111	dms.writeQueue <- req
112}
113
114// Shutdown implements the MetricStore interface.
115func (dms *DiskMetricStore) Shutdown() error {
116	close(dms.drain)
117	return <-dms.done
118}
119
120// Healthy implements the MetricStore interface.
121func (dms *DiskMetricStore) Healthy() error {
122	// By taking the lock we check that there is no deadlock.
123	dms.lock.Lock()
124	defer dms.lock.Unlock()
125
126	// A pushgateway that cannot be written to should not be
127	// considered as healthy.
128	if len(dms.writeQueue) == cap(dms.writeQueue) {
129		return fmt.Errorf("write queue is full")
130	}
131
132	return nil
133}
134
135// Ready implements the MetricStore interface.
136func (dms *DiskMetricStore) Ready() error {
137	return dms.Healthy()
138}
139
140// GetMetricFamilies implements the MetricStore interface.
141func (dms *DiskMetricStore) GetMetricFamilies() []*dto.MetricFamily {
142	dms.lock.RLock()
143	defer dms.lock.RUnlock()
144
145	result := []*dto.MetricFamily{}
146	mfStatByName := map[string]mfStat{}
147
148	for _, group := range dms.metricGroups {
149		for name, tmf := range group.Metrics {
150			mf := tmf.GetMetricFamily()
151			if mf == nil {
152				level.Warn(dms.logger).Log("msg", "storage corruption detected, consider wiping the persistence file")
153				continue
154			}
155			stat, exists := mfStatByName[name]
156			if exists {
157				existingMF := result[stat.pos]
158				if !stat.copied {
159					mfStatByName[name] = mfStat{
160						pos:    stat.pos,
161						copied: true,
162					}
163					existingMF = copyMetricFamily(existingMF)
164					result[stat.pos] = existingMF
165				}
166				if mf.GetHelp() != existingMF.GetHelp() {
167					level.Info(dms.logger).Log("msg", "metric families inconsistent help strings", "err", "Metric families have inconsistent help strings. The latter will have priority. This is bad. Fix your pushed metrics!", "new", mf, "old", existingMF)
168				}
169				// Type inconsistency cannot be fixed here. We will detect it during
170				// gathering anyway, so no reason to log anything here.
171				existingMF.Metric = append(existingMF.Metric, mf.Metric...)
172			} else {
173				copied := false
174				if help, ok := dms.predefinedHelp[name]; ok && mf.GetHelp() != help {
175					level.Info(dms.logger).Log("msg", "metric families overlap", "err", "Metric family has the same name as a metric family used by the Pushgateway itself but it has a different help string. Changing it to the standard help string. This is bad. Fix your pushed metrics!", "metric_family", mf, "standard_help", help)
176					mf = copyMetricFamily(mf)
177					copied = true
178					mf.Help = proto.String(help)
179				}
180				mfStatByName[name] = mfStat{
181					pos:    len(result),
182					copied: copied,
183				}
184				result = append(result, mf)
185			}
186		}
187	}
188	return result
189}
190
191// GetMetricFamiliesMap implements the MetricStore interface.
192func (dms *DiskMetricStore) GetMetricFamiliesMap() GroupingKeyToMetricGroup {
193	dms.lock.RLock()
194	defer dms.lock.RUnlock()
195	groupsCopy := make(GroupingKeyToMetricGroup, len(dms.metricGroups))
196	for k, g := range dms.metricGroups {
197		metricsCopy := make(NameToTimestampedMetricFamilyMap, len(g.Metrics))
198		groupsCopy[k] = MetricGroup{Labels: g.Labels, Metrics: metricsCopy}
199		for n, tmf := range g.Metrics {
200			metricsCopy[n] = tmf
201		}
202	}
203	return groupsCopy
204}
205
206func (dms *DiskMetricStore) loop(persistenceInterval time.Duration) {
207	lastPersist := time.Now()
208	persistScheduled := false
209	lastWrite := time.Time{}
210	persistDone := make(chan time.Time)
211	var persistTimer *time.Timer
212
213	checkPersist := func() {
214		if dms.persistenceFile != "" && !persistScheduled && lastWrite.After(lastPersist) {
215			persistTimer = time.AfterFunc(
216				persistenceInterval-lastWrite.Sub(lastPersist),
217				func() {
218					persistStarted := time.Now()
219					if err := dms.persist(); err != nil {
220						level.Error(dms.logger).Log("msg", "error persisting metrics", "err", err)
221					} else {
222						level.Info(dms.logger).Log("msg", "metrics persisted", "file", dms.persistenceFile)
223					}
224					persistDone <- persistStarted
225				},
226			)
227			persistScheduled = true
228		}
229	}
230
231	for {
232		select {
233		case wr := <-dms.writeQueue:
234			lastWrite = time.Now()
235			if dms.checkWriteRequest(wr) {
236				dms.processWriteRequest(wr)
237			} else {
238				dms.setPushFailedTimestamp(wr)
239			}
240			if wr.Done != nil {
241				close(wr.Done)
242			}
243			checkPersist()
244		case lastPersist = <-persistDone:
245			persistScheduled = false
246			checkPersist() // In case something has been written in the meantime.
247		case <-dms.drain:
248			// Prevent a scheduled persist from firing later.
249			if persistTimer != nil {
250				persistTimer.Stop()
251			}
252			// Now draining...
253			for {
254				select {
255				case wr := <-dms.writeQueue:
256					if dms.checkWriteRequest(wr) {
257						dms.processWriteRequest(wr)
258					} else {
259						dms.setPushFailedTimestamp(wr)
260					}
261				default:
262					dms.done <- dms.persist()
263					return
264				}
265			}
266		}
267	}
268}
269
270func (dms *DiskMetricStore) processWriteRequest(wr WriteRequest) {
271	dms.lock.Lock()
272	defer dms.lock.Unlock()
273
274	key := groupingKeyFor(wr.Labels)
275
276	if wr.MetricFamilies == nil {
277		// No MetricFamilies means delete request. Delete the whole
278		// metric group, and we are done here.
279		delete(dms.metricGroups, key)
280		return
281	}
282	// Otherwise, it's an update.
283	group, ok := dms.metricGroups[key]
284	if !ok {
285		group = MetricGroup{
286			Labels:  wr.Labels,
287			Metrics: NameToTimestampedMetricFamilyMap{},
288		}
289		dms.metricGroups[key] = group
290	} else if wr.Replace {
291		// For replace, we have to delete all metric families in the
292		// group except pre-existing push timestamps.
293		for name := range group.Metrics {
294			if name != pushMetricName && name != pushFailedMetricName {
295				delete(group.Metrics, name)
296			}
297		}
298	}
299	wr.MetricFamilies[pushMetricName] = newPushTimestampGauge(wr.Labels, wr.Timestamp)
300	// Only add a zero push-failed metric if none is there yet, so that a
301	// previously added fail timestamp is retained.
302	if _, ok := group.Metrics[pushFailedMetricName]; !ok {
303		wr.MetricFamilies[pushFailedMetricName] = newPushFailedTimestampGauge(wr.Labels, time.Time{})
304	}
305	for name, mf := range wr.MetricFamilies {
306		group.Metrics[name] = TimestampedMetricFamily{
307			Timestamp:            wr.Timestamp,
308			GobbableMetricFamily: (*GobbableMetricFamily)(mf),
309		}
310	}
311}
312
313func (dms *DiskMetricStore) setPushFailedTimestamp(wr WriteRequest) {
314	dms.lock.Lock()
315	defer dms.lock.Unlock()
316
317	key := groupingKeyFor(wr.Labels)
318
319	group, ok := dms.metricGroups[key]
320	if !ok {
321		group = MetricGroup{
322			Labels:  wr.Labels,
323			Metrics: NameToTimestampedMetricFamilyMap{},
324		}
325		dms.metricGroups[key] = group
326	}
327
328	group.Metrics[pushFailedMetricName] = TimestampedMetricFamily{
329		Timestamp:            wr.Timestamp,
330		GobbableMetricFamily: (*GobbableMetricFamily)(newPushFailedTimestampGauge(wr.Labels, wr.Timestamp)),
331	}
332	// Only add a zero push metric if none is there yet, so that a
333	// previously added push timestamp is retained.
334	if _, ok := group.Metrics[pushMetricName]; !ok {
335		group.Metrics[pushMetricName] = TimestampedMetricFamily{
336			Timestamp:            wr.Timestamp,
337			GobbableMetricFamily: (*GobbableMetricFamily)(newPushTimestampGauge(wr.Labels, time.Time{})),
338		}
339	}
340}
341
342// checkWriteRequest return if applying the provided WriteRequest will result in
343// a consistent state of metrics. The dms is not modified by the check. However,
344// the WriteRequest _will_ be sanitized: the MetricFamilies are ensured to
345// contain the grouping Labels after the check. If false is returned, the
346// causing error is written to the Done channel of the WriteRequest.
347//
348// Special case: If the WriteRequest has no Done channel set, the (expensive)
349// consistency check is skipped. The WriteRequest is still sanitized, and the
350// presence of timestamps still results in returning false.
351func (dms *DiskMetricStore) checkWriteRequest(wr WriteRequest) bool {
352	if wr.MetricFamilies == nil {
353		// Delete request cannot create inconsistencies, and nothing has
354		// to be sanitized.
355		return true
356	}
357
358	var err error
359	defer func() {
360		if err != nil && wr.Done != nil {
361			wr.Done <- err
362		}
363	}()
364
365	if timestampsPresent(wr.MetricFamilies) {
366		err = errTimestamp
367		return false
368	}
369	for _, mf := range wr.MetricFamilies {
370		sanitizeLabels(mf, wr.Labels)
371	}
372
373	// Without Done channel, don't do the expensive consistency check.
374	if wr.Done == nil {
375		return true
376	}
377
378	// Construct a test dms, acting on a copy of the metrics, to test the
379	// WriteRequest with.
380	tdms := &DiskMetricStore{
381		metricGroups:   dms.GetMetricFamiliesMap(),
382		predefinedHelp: dms.predefinedHelp,
383		logger:         log.NewNopLogger(),
384	}
385	tdms.processWriteRequest(wr)
386
387	// Construct a test Gatherer to check if consistent gathering is possible.
388	tg := prometheus.Gatherers{
389		prometheus.DefaultGatherer,
390		prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
391			return tdms.GetMetricFamilies(), nil
392		}),
393	}
394	if _, err = tg.Gather(); err != nil {
395		return false
396	}
397	return true
398}
399
400func (dms *DiskMetricStore) persist() error {
401	// Check (again) if persistence is configured because some code paths
402	// will call this method even if it is not.
403	if dms.persistenceFile == "" {
404		return nil
405	}
406	f, err := ioutil.TempFile(
407		path.Dir(dms.persistenceFile),
408		path.Base(dms.persistenceFile)+".in_progress.",
409	)
410	if err != nil {
411		return err
412	}
413	inProgressFileName := f.Name()
414	e := gob.NewEncoder(f)
415
416	dms.lock.RLock()
417	err = e.Encode(dms.metricGroups)
418	dms.lock.RUnlock()
419	if err != nil {
420		f.Close()
421		os.Remove(inProgressFileName)
422		return err
423	}
424	if err := f.Close(); err != nil {
425		os.Remove(inProgressFileName)
426		return err
427	}
428	return os.Rename(inProgressFileName, dms.persistenceFile)
429}
430
431func (dms *DiskMetricStore) restore() error {
432	if dms.persistenceFile == "" {
433		return nil
434	}
435	f, err := os.Open(dms.persistenceFile)
436	if os.IsNotExist(err) {
437		return nil
438	}
439	if err != nil {
440		return err
441	}
442	defer f.Close()
443	d := gob.NewDecoder(f)
444	if err := d.Decode(&dms.metricGroups); err != nil {
445		return err
446	}
447	return nil
448}
449
450func copyMetricFamily(mf *dto.MetricFamily) *dto.MetricFamily {
451	return &dto.MetricFamily{
452		Name:   mf.Name,
453		Help:   mf.Help,
454		Type:   mf.Type,
455		Metric: append([]*dto.Metric{}, mf.Metric...),
456	}
457}
458
459// groupingKeyFor creates a grouping key from the provided map of grouping
460// labels. The grouping key is created by joining all label names and values
461// together with model.SeparatorByte as a separator. The label names are sorted
462// lexicographically before joining. In that way, the grouping key is both
463// reproducible and unique.
464func groupingKeyFor(labels map[string]string) string {
465	if len(labels) == 0 { // Super fast path.
466		return ""
467	}
468
469	labelNames := make([]string, 0, len(labels))
470	for labelName := range labels {
471		labelNames = append(labelNames, labelName)
472	}
473	sort.Strings(labelNames)
474
475	sb := strings.Builder{}
476	for i, labelName := range labelNames {
477		sb.WriteString(labelName)
478		sb.WriteByte(model.SeparatorByte)
479		sb.WriteString(labels[labelName])
480		if i+1 < len(labels) { // No separator at the end.
481			sb.WriteByte(model.SeparatorByte)
482		}
483	}
484	return sb.String()
485}
486
487// extractPredefinedHelpStrings extracts all the HELP strings from the provided
488// gatherer so that the DiskMetricStore can fix deviations in pushed metrics.
489func extractPredefinedHelpStrings(g prometheus.Gatherer) (map[string]string, error) {
490	if g == nil {
491		return nil, nil
492	}
493	mfs, err := g.Gather()
494	if err != nil {
495		return nil, err
496	}
497	result := map[string]string{}
498	for _, mf := range mfs {
499		result[mf.GetName()] = mf.GetHelp()
500	}
501	return result, nil
502}
503
504func newPushTimestampGauge(groupingLabels map[string]string, t time.Time) *dto.MetricFamily {
505	return newTimestampGauge(pushMetricName, pushMetricHelp, groupingLabels, t)
506}
507
508func newPushFailedTimestampGauge(groupingLabels map[string]string, t time.Time) *dto.MetricFamily {
509	return newTimestampGauge(pushFailedMetricName, pushFailedMetricHelp, groupingLabels, t)
510}
511
512func newTimestampGauge(name, help string, groupingLabels map[string]string, t time.Time) *dto.MetricFamily {
513	var ts float64
514	if !t.IsZero() {
515		ts = float64(t.UnixNano()) / 1e9
516	}
517	mf := &dto.MetricFamily{
518		Name: proto.String(name),
519		Help: proto.String(help),
520		Type: dto.MetricType_GAUGE.Enum(),
521		Metric: []*dto.Metric{
522			{
523				Gauge: &dto.Gauge{
524					Value: proto.Float64(ts),
525				},
526			},
527		},
528	}
529	sanitizeLabels(mf, groupingLabels)
530	return mf
531}
532
533// sanitizeLabels ensures that all the labels in groupingLabels and the
534// `instance` label are present in the MetricFamily. The label values from
535// groupingLabels are set in each Metric, no matter what. After that, if the
536// 'instance' label is not present at all in a Metric, it will be created (with
537// an empty string as value).
538//
539// Finally, sanitizeLabels sorts the label pairs of all metrics.
540func sanitizeLabels(mf *dto.MetricFamily, groupingLabels map[string]string) {
541	gLabelsNotYetDone := make(map[string]string, len(groupingLabels))
542
543metric:
544	for _, m := range mf.GetMetric() {
545		for ln, lv := range groupingLabels {
546			gLabelsNotYetDone[ln] = lv
547		}
548		hasInstanceLabel := false
549		for _, lp := range m.GetLabel() {
550			ln := lp.GetName()
551			if lv, ok := gLabelsNotYetDone[ln]; ok {
552				lp.Value = proto.String(lv)
553				delete(gLabelsNotYetDone, ln)
554			}
555			if ln == string(model.InstanceLabel) {
556				hasInstanceLabel = true
557			}
558			if len(gLabelsNotYetDone) == 0 && hasInstanceLabel {
559				sort.Sort(labelPairs(m.Label))
560				continue metric
561			}
562		}
563		for ln, lv := range gLabelsNotYetDone {
564			m.Label = append(m.Label, &dto.LabelPair{
565				Name:  proto.String(ln),
566				Value: proto.String(lv),
567			})
568			if ln == string(model.InstanceLabel) {
569				hasInstanceLabel = true
570			}
571			delete(gLabelsNotYetDone, ln) // To prepare map for next metric.
572		}
573		if !hasInstanceLabel {
574			m.Label = append(m.Label, &dto.LabelPair{
575				Name:  proto.String(string(model.InstanceLabel)),
576				Value: proto.String(""),
577			})
578		}
579		sort.Sort(labelPairs(m.Label))
580	}
581}
582
583// Checks if any timestamps have been specified.
584func timestampsPresent(metricFamilies map[string]*dto.MetricFamily) bool {
585	for _, mf := range metricFamilies {
586		for _, m := range mf.GetMetric() {
587			if m.TimestampMs != nil {
588				return true
589			}
590		}
591	}
592	return false
593}
594
595// labelPairs implements sort.Interface. It provides a sortable version of a
596// slice of dto.LabelPair pointers.
597type labelPairs []*dto.LabelPair
598
599func (s labelPairs) Len() int {
600	return len(s)
601}
602
603func (s labelPairs) Swap(i, j int) {
604	s[i], s[j] = s[j], s[i]
605}
606
607func (s labelPairs) Less(i, j int) bool {
608	return s[i].GetName() < s[j].GetName()
609}
610