1// Copyright 2021 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package remote 15 16import ( 17 "bytes" 18 "context" 19 "fmt" 20 "io/ioutil" 21 "net/http" 22 "net/http/httptest" 23 "testing" 24 25 "github.com/go-kit/kit/log" 26 "github.com/prometheus/prometheus/pkg/exemplar" 27 "github.com/prometheus/prometheus/pkg/labels" 28 "github.com/prometheus/prometheus/prompb" 29 "github.com/prometheus/prometheus/storage" 30 "github.com/stretchr/testify/require" 31) 32 33func TestRemoteWriteHandler(t *testing.T) { 34 buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) 35 require.NoError(t, err) 36 37 req, err := http.NewRequest("", "", bytes.NewReader(buf)) 38 require.NoError(t, err) 39 40 appendable := &mockAppendable{} 41 handler := NewWriteHandler(nil, appendable) 42 43 recorder := httptest.NewRecorder() 44 handler.ServeHTTP(recorder, req) 45 46 resp := recorder.Result() 47 require.Equal(t, http.StatusNoContent, resp.StatusCode) 48 49 i := 0 50 for _, ts := range writeRequestFixture.Timeseries { 51 labels := labelProtosToLabels(ts.Labels) 52 for _, s := range ts.Samples { 53 require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i]) 54 i++ 55 } 56 } 57} 58 59func TestOutOfOrder(t *testing.T) { 60 buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ 61 Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, 62 Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, 63 }}, nil, nil) 64 require.NoError(t, err) 65 66 req, err := http.NewRequest("", "", bytes.NewReader(buf)) 67 require.NoError(t, err) 68 69 appendable := &mockAppendable{ 70 latest: 100, 71 } 72 handler := NewWriteHandler(log.NewNopLogger(), appendable) 73 74 recorder := httptest.NewRecorder() 75 handler.ServeHTTP(recorder, req) 76 77 resp := recorder.Result() 78 require.Equal(t, http.StatusBadRequest, resp.StatusCode) 79} 80 81func TestCommitErr(t *testing.T) { 82 buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil) 83 require.NoError(t, err) 84 85 req, err := http.NewRequest("", "", bytes.NewReader(buf)) 86 require.NoError(t, err) 87 88 appendable := &mockAppendable{ 89 commitErr: fmt.Errorf("commit error"), 90 } 91 handler := NewWriteHandler(log.NewNopLogger(), appendable) 92 93 recorder := httptest.NewRecorder() 94 handler.ServeHTTP(recorder, req) 95 96 resp := recorder.Result() 97 body, err := ioutil.ReadAll(resp.Body) 98 require.NoError(t, err) 99 require.Equal(t, http.StatusInternalServerError, resp.StatusCode) 100 require.Equal(t, "commit error\n", string(body)) 101} 102 103type mockAppendable struct { 104 latest int64 105 samples []mockSample 106 commitErr error 107} 108 109type mockSample struct { 110 l labels.Labels 111 t int64 112 v float64 113} 114 115func (m *mockAppendable) Appender(_ context.Context) storage.Appender { 116 return m 117} 118 119func (m *mockAppendable) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) { 120 if t < m.latest { 121 return 0, storage.ErrOutOfOrderSample 122 } 123 124 m.latest = t 125 m.samples = append(m.samples, mockSample{l, t, v}) 126 return 0, nil 127} 128 129func (m *mockAppendable) Commit() error { 130 return m.commitErr 131} 132 133func (*mockAppendable) Rollback() error { 134 return fmt.Errorf("not implemented") 135} 136 137func (*mockAppendable) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { 138 // noop until we implement exemplars over remote write 139 return 0, nil 140} 141