1// Copyright 2021 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package remote
15
16import (
17	"bytes"
18	"context"
19	"fmt"
20	"io/ioutil"
21	"net/http"
22	"net/http/httptest"
23	"testing"
24
25	"github.com/go-kit/kit/log"
26	"github.com/prometheus/prometheus/pkg/exemplar"
27	"github.com/prometheus/prometheus/pkg/labels"
28	"github.com/prometheus/prometheus/prompb"
29	"github.com/prometheus/prometheus/storage"
30	"github.com/stretchr/testify/require"
31)
32
33func TestRemoteWriteHandler(t *testing.T) {
34	buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil)
35	require.NoError(t, err)
36
37	req, err := http.NewRequest("", "", bytes.NewReader(buf))
38	require.NoError(t, err)
39
40	appendable := &mockAppendable{}
41	handler := NewWriteHandler(nil, appendable)
42
43	recorder := httptest.NewRecorder()
44	handler.ServeHTTP(recorder, req)
45
46	resp := recorder.Result()
47	require.Equal(t, http.StatusNoContent, resp.StatusCode)
48
49	i := 0
50	for _, ts := range writeRequestFixture.Timeseries {
51		labels := labelProtosToLabels(ts.Labels)
52		for _, s := range ts.Samples {
53			require.Equal(t, mockSample{labels, s.Timestamp, s.Value}, appendable.samples[i])
54			i++
55		}
56	}
57}
58
59func TestOutOfOrder(t *testing.T) {
60	buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
61		Labels:  []prompb.Label{{Name: "__name__", Value: "test_metric"}},
62		Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
63	}}, nil, nil)
64	require.NoError(t, err)
65
66	req, err := http.NewRequest("", "", bytes.NewReader(buf))
67	require.NoError(t, err)
68
69	appendable := &mockAppendable{
70		latest: 100,
71	}
72	handler := NewWriteHandler(log.NewNopLogger(), appendable)
73
74	recorder := httptest.NewRecorder()
75	handler.ServeHTTP(recorder, req)
76
77	resp := recorder.Result()
78	require.Equal(t, http.StatusBadRequest, resp.StatusCode)
79}
80
81func TestCommitErr(t *testing.T) {
82	buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil)
83	require.NoError(t, err)
84
85	req, err := http.NewRequest("", "", bytes.NewReader(buf))
86	require.NoError(t, err)
87
88	appendable := &mockAppendable{
89		commitErr: fmt.Errorf("commit error"),
90	}
91	handler := NewWriteHandler(log.NewNopLogger(), appendable)
92
93	recorder := httptest.NewRecorder()
94	handler.ServeHTTP(recorder, req)
95
96	resp := recorder.Result()
97	body, err := ioutil.ReadAll(resp.Body)
98	require.NoError(t, err)
99	require.Equal(t, http.StatusInternalServerError, resp.StatusCode)
100	require.Equal(t, "commit error\n", string(body))
101}
102
103type mockAppendable struct {
104	latest    int64
105	samples   []mockSample
106	commitErr error
107}
108
109type mockSample struct {
110	l labels.Labels
111	t int64
112	v float64
113}
114
115func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
116	return m
117}
118
119func (m *mockAppendable) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) {
120	if t < m.latest {
121		return 0, storage.ErrOutOfOrderSample
122	}
123
124	m.latest = t
125	m.samples = append(m.samples, mockSample{l, t, v})
126	return 0, nil
127}
128
129func (m *mockAppendable) Commit() error {
130	return m.commitErr
131}
132
133func (*mockAppendable) Rollback() error {
134	return fmt.Errorf("not implemented")
135}
136
137func (*mockAppendable) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
138	// noop until we implement exemplars over remote write
139	return 0, nil
140}
141