1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package receive 5 6import ( 7 "sync" 8 9 "github.com/go-kit/kit/log" 10 "github.com/go-kit/kit/log/level" 11 "github.com/pkg/errors" 12 "github.com/prometheus/prometheus/prompb" 13 14 "github.com/prometheus/prometheus/pkg/labels" 15 "github.com/prometheus/prometheus/storage" 16 terrors "github.com/prometheus/prometheus/tsdb/errors" 17) 18 19// Appendable returns an Appender. 20type Appendable interface { 21 Appender() (storage.Appender, error) 22} 23 24type Writer struct { 25 logger log.Logger 26 append Appendable 27} 28 29func NewWriter(logger log.Logger, app Appendable) *Writer { 30 return &Writer{ 31 logger: logger, 32 append: app, 33 } 34} 35 36func (r *Writer) Write(wreq *prompb.WriteRequest) error { 37 var ( 38 numOutOfOrder = 0 39 numDuplicates = 0 40 numOutOfBounds = 0 41 ) 42 43 app, err := r.append.Appender() 44 if err != nil { 45 return errors.Wrap(err, "get appender") 46 } 47 48 var errs terrors.MultiError 49 for _, t := range wreq.Timeseries { 50 lset := make(labels.Labels, len(t.Labels)) 51 for j := range t.Labels { 52 lset[j] = labels.Label{ 53 Name: t.Labels[j].Name, 54 Value: t.Labels[j].Value, 55 } 56 } 57 58 // Append as many valid samples as possible, but keep track of the errors. 59 for _, s := range t.Samples { 60 _, err = app.Add(lset, s.Timestamp, s.Value) 61 switch err { 62 case nil: 63 continue 64 case storage.ErrOutOfOrderSample: 65 numOutOfOrder++ 66 level.Debug(r.logger).Log("msg", "Out of order sample", "lset", lset.String(), "sample", s.String()) 67 case storage.ErrDuplicateSampleForTimestamp: 68 numDuplicates++ 69 level.Debug(r.logger).Log("msg", "Duplicate sample for timestamp", "lset", lset.String(), "sample", s.String()) 70 case storage.ErrOutOfBounds: 71 numOutOfBounds++ 72 level.Debug(r.logger).Log("msg", "Out of bounds metric", "lset", lset.String(), "sample", s.String()) 73 } 74 } 75 } 76 77 if numOutOfOrder > 0 { 78 level.Warn(r.logger).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder) 79 errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "failed to non-fast add %d samples", numOutOfOrder)) 80 } 81 if numDuplicates > 0 { 82 level.Warn(r.logger).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates) 83 errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "failed to non-fast add %d samples", numDuplicates)) 84 } 85 if numOutOfBounds > 0 { 86 level.Warn(r.logger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds) 87 errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "failed to non-fast add %d samples", numOutOfBounds)) 88 } 89 90 if err := app.Commit(); err != nil { 91 errs.Add(errors.Wrap(err, "commit samples")) 92 } 93 94 return errs.Err() 95} 96 97type fakeAppendable struct { 98 appender storage.Appender 99 appenderErr func() error 100} 101 102var _ Appendable = &fakeAppendable{} 103 104func nilErrFn() error { 105 return nil 106} 107 108func (f *fakeAppendable) Appender() (storage.Appender, error) { 109 errf := f.appenderErr 110 if errf == nil { 111 errf = nilErrFn 112 } 113 return f.appender, errf() 114} 115 116type fakeAppender struct { 117 sync.Mutex 118 samples map[string][]prompb.Sample 119 addErr func() error 120 addFastErr func() error 121 commitErr func() error 122 rollbackErr func() error 123} 124 125var _ storage.Appender = &fakeAppender{} 126 127func newFakeAppender(addErr, addFastErr, commitErr, rollbackErr func() error) *fakeAppender { 128 if addErr == nil { 129 addErr = nilErrFn 130 } 131 if addFastErr == nil { 132 addFastErr = nilErrFn 133 } 134 if commitErr == nil { 135 commitErr = nilErrFn 136 } 137 if rollbackErr == nil { 138 rollbackErr = nilErrFn 139 } 140 return &fakeAppender{ 141 samples: make(map[string][]prompb.Sample), 142 addErr: addErr, 143 addFastErr: addFastErr, 144 commitErr: commitErr, 145 rollbackErr: rollbackErr, 146 } 147} 148 149func (f *fakeAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { 150 f.Lock() 151 defer f.Unlock() 152 f.samples[l.String()] = append(f.samples[l.String()], prompb.Sample{Value: v, Timestamp: t}) 153 return 0, f.addErr() 154} 155 156func (f *fakeAppender) AddFast(l labels.Labels, ref uint64, t int64, v float64) error { 157 f.Lock() 158 defer f.Unlock() 159 f.samples[l.String()] = append(f.samples[l.String()], prompb.Sample{Value: v, Timestamp: t}) 160 return f.addFastErr() 161} 162 163func (f *fakeAppender) Commit() error { 164 return f.commitErr() 165} 166 167func (f *fakeAppender) Rollback() error { 168 return f.rollbackErr() 169} 170