1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package receive
5
6import (
7	"sync"
8
9	"github.com/go-kit/kit/log"
10	"github.com/go-kit/kit/log/level"
11	"github.com/pkg/errors"
12	"github.com/prometheus/prometheus/prompb"
13
14	"github.com/prometheus/prometheus/pkg/labels"
15	"github.com/prometheus/prometheus/storage"
16	terrors "github.com/prometheus/prometheus/tsdb/errors"
17)
18
19// Appendable returns an Appender.
20type Appendable interface {
21	Appender() (storage.Appender, error)
22}
23
24type Writer struct {
25	logger log.Logger
26	append Appendable
27}
28
29func NewWriter(logger log.Logger, app Appendable) *Writer {
30	return &Writer{
31		logger: logger,
32		append: app,
33	}
34}
35
36func (r *Writer) Write(wreq *prompb.WriteRequest) error {
37	var (
38		numOutOfOrder  = 0
39		numDuplicates  = 0
40		numOutOfBounds = 0
41	)
42
43	app, err := r.append.Appender()
44	if err != nil {
45		return errors.Wrap(err, "get appender")
46	}
47
48	var errs terrors.MultiError
49	for _, t := range wreq.Timeseries {
50		lset := make(labels.Labels, len(t.Labels))
51		for j := range t.Labels {
52			lset[j] = labels.Label{
53				Name:  t.Labels[j].Name,
54				Value: t.Labels[j].Value,
55			}
56		}
57
58		// Append as many valid samples as possible, but keep track of the errors.
59		for _, s := range t.Samples {
60			_, err = app.Add(lset, s.Timestamp, s.Value)
61			switch err {
62			case nil:
63				continue
64			case storage.ErrOutOfOrderSample:
65				numOutOfOrder++
66				level.Debug(r.logger).Log("msg", "Out of order sample", "lset", lset.String(), "sample", s.String())
67			case storage.ErrDuplicateSampleForTimestamp:
68				numDuplicates++
69				level.Debug(r.logger).Log("msg", "Duplicate sample for timestamp", "lset", lset.String(), "sample", s.String())
70			case storage.ErrOutOfBounds:
71				numOutOfBounds++
72				level.Debug(r.logger).Log("msg", "Out of bounds metric", "lset", lset.String(), "sample", s.String())
73			}
74		}
75	}
76
77	if numOutOfOrder > 0 {
78		level.Warn(r.logger).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder)
79		errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "failed to non-fast add %d samples", numOutOfOrder))
80	}
81	if numDuplicates > 0 {
82		level.Warn(r.logger).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates)
83		errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "failed to non-fast add %d samples", numDuplicates))
84	}
85	if numOutOfBounds > 0 {
86		level.Warn(r.logger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds)
87		errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "failed to non-fast add %d samples", numOutOfBounds))
88	}
89
90	if err := app.Commit(); err != nil {
91		errs.Add(errors.Wrap(err, "commit samples"))
92	}
93
94	return errs.Err()
95}
96
97type fakeAppendable struct {
98	appender    storage.Appender
99	appenderErr func() error
100}
101
102var _ Appendable = &fakeAppendable{}
103
104func nilErrFn() error {
105	return nil
106}
107
108func (f *fakeAppendable) Appender() (storage.Appender, error) {
109	errf := f.appenderErr
110	if errf == nil {
111		errf = nilErrFn
112	}
113	return f.appender, errf()
114}
115
116type fakeAppender struct {
117	sync.Mutex
118	samples     map[string][]prompb.Sample
119	addErr      func() error
120	addFastErr  func() error
121	commitErr   func() error
122	rollbackErr func() error
123}
124
125var _ storage.Appender = &fakeAppender{}
126
127func newFakeAppender(addErr, addFastErr, commitErr, rollbackErr func() error) *fakeAppender {
128	if addErr == nil {
129		addErr = nilErrFn
130	}
131	if addFastErr == nil {
132		addFastErr = nilErrFn
133	}
134	if commitErr == nil {
135		commitErr = nilErrFn
136	}
137	if rollbackErr == nil {
138		rollbackErr = nilErrFn
139	}
140	return &fakeAppender{
141		samples:     make(map[string][]prompb.Sample),
142		addErr:      addErr,
143		addFastErr:  addFastErr,
144		commitErr:   commitErr,
145		rollbackErr: rollbackErr,
146	}
147}
148
149func (f *fakeAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
150	f.Lock()
151	defer f.Unlock()
152	f.samples[l.String()] = append(f.samples[l.String()], prompb.Sample{Value: v, Timestamp: t})
153	return 0, f.addErr()
154}
155
156func (f *fakeAppender) AddFast(l labels.Labels, ref uint64, t int64, v float64) error {
157	f.Lock()
158	defer f.Unlock()
159	f.samples[l.String()] = append(f.samples[l.String()], prompb.Sample{Value: v, Timestamp: t})
160	return f.addFastErr()
161}
162
163func (f *fakeAppender) Commit() error {
164	return f.commitErr()
165}
166
167func (f *fakeAppender) Rollback() error {
168	return f.rollbackErr()
169}
170