1// Copyright 2013 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package remote
15
16import (
17	"context"
18	"fmt"
19	"io/ioutil"
20	"math"
21	"os"
22	"reflect"
23	"sort"
24	"strconv"
25	"sync"
26	"sync/atomic"
27	"testing"
28	"time"
29
30	"github.com/go-kit/kit/log"
31	"github.com/gogo/protobuf/proto"
32	"github.com/golang/snappy"
33	"github.com/stretchr/testify/require"
34
35	client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
36	"github.com/prometheus/common/model"
37	"github.com/prometheus/prometheus/config"
38	"github.com/prometheus/prometheus/pkg/labels"
39	"github.com/prometheus/prometheus/prompb"
40	"github.com/prometheus/prometheus/util/testutil"
41	"github.com/prometheus/tsdb"
42	tsdbLabels "github.com/prometheus/tsdb/labels"
43)
44
45const defaultFlushDeadline = 1 * time.Minute
46
47func TestSampleDelivery(t *testing.T) {
48	// Let's create an even number of send batches so we don't run into the
49	// batch timeout case.
50	n := config.DefaultQueueConfig.Capacity * 2
51	samples, series := createTimeseries(n)
52
53	c := NewTestStorageClient()
54	c.expectSamples(samples[:len(samples)/2], series)
55
56	cfg := config.DefaultQueueConfig
57	cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
58	cfg.MaxShards = 1
59
60	dir, err := ioutil.TempDir("", "TestSampleDeliver")
61	testutil.Ok(t, err)
62	defer os.RemoveAll(dir)
63
64	m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
65	m.StoreSeries(series, 0)
66
67	// These should be received by the client.
68	m.Start()
69	m.Append(samples[:len(samples)/2])
70	defer m.Stop()
71
72	c.waitForExpectedSamples(t)
73	m.Append(samples[len(samples)/2:])
74	c.expectSamples(samples[len(samples)/2:], series)
75	c.waitForExpectedSamples(t)
76}
77
78func TestSampleDeliveryTimeout(t *testing.T) {
79	// Let's send one less sample than batch size, and wait the timeout duration
80	n := 9
81	samples, series := createTimeseries(n)
82	c := NewTestStorageClient()
83
84	cfg := config.DefaultQueueConfig
85	cfg.MaxShards = 1
86	cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
87
88	dir, err := ioutil.TempDir("", "TestSampleDeliveryTimeout")
89	testutil.Ok(t, err)
90	defer os.RemoveAll(dir)
91
92	m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
93	m.StoreSeries(series, 0)
94	m.Start()
95	defer m.Stop()
96
97	// Send the samples twice, waiting for the samples in the meantime.
98	c.expectSamples(samples, series)
99	m.Append(samples)
100	c.waitForExpectedSamples(t)
101
102	c.expectSamples(samples, series)
103	m.Append(samples)
104	c.waitForExpectedSamples(t)
105}
106
107func TestSampleDeliveryOrder(t *testing.T) {
108	ts := 10
109	n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
110	samples := make([]tsdb.RefSample, 0, n)
111	series := make([]tsdb.RefSeries, 0, n)
112	for i := 0; i < n; i++ {
113		name := fmt.Sprintf("test_metric_%d", i%ts)
114		samples = append(samples, tsdb.RefSample{
115			Ref: uint64(i),
116			T:   int64(i),
117			V:   float64(i),
118		})
119		series = append(series, tsdb.RefSeries{
120			Ref:    uint64(i),
121			Labels: tsdbLabels.Labels{tsdbLabels.Label{Name: "__name__", Value: name}},
122		})
123	}
124
125	c := NewTestStorageClient()
126	c.expectSamples(samples, series)
127
128	dir, err := ioutil.TempDir("", "TestSampleDeliveryOrder")
129	testutil.Ok(t, err)
130	defer os.RemoveAll(dir)
131
132	m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
133	m.StoreSeries(series, 0)
134
135	m.Start()
136	defer m.Stop()
137	// These should be received by the client.
138	m.Append(samples)
139	c.waitForExpectedSamples(t)
140}
141
142func TestShutdown(t *testing.T) {
143	deadline := 1 * time.Second
144	c := NewTestBlockedStorageClient()
145
146	dir, err := ioutil.TempDir("", "TestShutdown")
147	testutil.Ok(t, err)
148	defer os.RemoveAll(dir)
149
150	m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
151	samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend)
152	m.StoreSeries(series, 0)
153	m.Start()
154
155	// Append blocks to guarantee delivery, so we do it in the background.
156	go func() {
157		m.Append(samples)
158	}()
159	time.Sleep(100 * time.Millisecond)
160
161	// Test to ensure that Stop doesn't block.
162	start := time.Now()
163	m.Stop()
164	// The samples will never be delivered, so duration should
165	// be at least equal to deadline, otherwise the flush deadline
166	// was not respected.
167	duration := time.Since(start)
168	if duration > time.Duration(deadline+(deadline/10)) {
169		t.Errorf("Took too long to shutdown: %s > %s", duration, deadline)
170	}
171	if duration < time.Duration(deadline) {
172		t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline)
173	}
174}
175
176func TestSeriesReset(t *testing.T) {
177	c := NewTestBlockedStorageClient()
178	deadline := 5 * time.Second
179	numSegments := 4
180	numSeries := 25
181
182	dir, err := ioutil.TempDir("", "TestSeriesReset")
183	testutil.Ok(t, err)
184	defer os.RemoveAll(dir)
185
186	m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
187	for i := 0; i < numSegments; i++ {
188		series := []tsdb.RefSeries{}
189		for j := 0; j < numSeries; j++ {
190			series = append(series, tsdb.RefSeries{Ref: uint64((i * 100) + j), Labels: tsdbLabels.Labels{{Name: "a", Value: "a"}}})
191		}
192		m.StoreSeries(series, i)
193	}
194	testutil.Equals(t, numSegments*numSeries, len(m.seriesLabels))
195	m.SeriesReset(2)
196	testutil.Equals(t, numSegments*numSeries/2, len(m.seriesLabels))
197}
198
199func TestReshard(t *testing.T) {
200	size := 10 // Make bigger to find more races.
201	n := config.DefaultQueueConfig.Capacity * size
202	samples, series := createTimeseries(n)
203
204	c := NewTestStorageClient()
205	c.expectSamples(samples, series)
206
207	cfg := config.DefaultQueueConfig
208	cfg.MaxShards = 1
209
210	dir, err := ioutil.TempDir("", "TestReshard")
211	testutil.Ok(t, err)
212	defer os.RemoveAll(dir)
213
214	m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
215	m.StoreSeries(series, 0)
216
217	m.Start()
218	defer m.Stop()
219
220	go func() {
221		for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity {
222			sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity])
223			require.True(t, sent)
224			time.Sleep(100 * time.Millisecond)
225		}
226	}()
227
228	for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ {
229		m.shards.stop()
230		m.shards.start(i)
231		time.Sleep(100 * time.Millisecond)
232	}
233
234	c.waitForExpectedSamples(t)
235}
236
237func TestReshardRaceWithStop(t *testing.T) {
238	c := NewTestStorageClient()
239	var m *QueueManager
240	h := sync.Mutex{}
241
242	h.Lock()
243
244	go func() {
245		for {
246			m = NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
247			m.Start()
248			h.Unlock()
249			h.Lock()
250			m.Stop()
251		}
252	}()
253
254	for i := 1; i < 100; i++ {
255		h.Lock()
256		m.reshardChan <- i
257		h.Unlock()
258	}
259}
260
261func TestReleaseNoninternedString(t *testing.T) {
262	c := NewTestStorageClient()
263	m := NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
264	m.Start()
265
266	for i := 1; i < 1000; i++ {
267		m.StoreSeries([]tsdb.RefSeries{
268			tsdb.RefSeries{
269				Ref: uint64(i),
270				Labels: tsdbLabels.Labels{
271					tsdbLabels.Label{
272						Name:  "asdf",
273						Value: fmt.Sprintf("%d", i),
274					},
275				},
276			},
277		}, 0)
278		m.SeriesReset(1)
279	}
280
281	metric := client_testutil.ToFloat64(noReferenceReleases)
282	testutil.Assert(t, metric == 0, "expected there to be no calls to release for strings that were not already interned: %d", int(metric))
283}
284
285func createTimeseries(n int) ([]tsdb.RefSample, []tsdb.RefSeries) {
286	samples := make([]tsdb.RefSample, 0, n)
287	series := make([]tsdb.RefSeries, 0, n)
288	for i := 0; i < n; i++ {
289		name := fmt.Sprintf("test_metric_%d", i)
290		samples = append(samples, tsdb.RefSample{
291			Ref: uint64(i),
292			T:   int64(i),
293			V:   float64(i),
294		})
295		series = append(series, tsdb.RefSeries{
296			Ref:    uint64(i),
297			Labels: tsdbLabels.Labels{{Name: "__name__", Value: name}},
298		})
299	}
300	return samples, series
301}
302
303func getSeriesNameFromRef(r tsdb.RefSeries) string {
304	for _, l := range r.Labels {
305		if l.Name == "__name__" {
306			return l.Value
307		}
308	}
309	return ""
310}
311
312type TestStorageClient struct {
313	receivedSamples map[string][]prompb.Sample
314	expectedSamples map[string][]prompb.Sample
315	wg              sync.WaitGroup
316	mtx             sync.Mutex
317	buf             []byte
318}
319
320func NewTestStorageClient() *TestStorageClient {
321	return &TestStorageClient{
322		receivedSamples: map[string][]prompb.Sample{},
323		expectedSamples: map[string][]prompb.Sample{},
324	}
325}
326
327func (c *TestStorageClient) expectSamples(ss []tsdb.RefSample, series []tsdb.RefSeries) {
328	c.mtx.Lock()
329	defer c.mtx.Unlock()
330
331	c.expectedSamples = map[string][]prompb.Sample{}
332	c.receivedSamples = map[string][]prompb.Sample{}
333
334	for _, s := range ss {
335		seriesName := getSeriesNameFromRef(series[s.Ref])
336		c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], prompb.Sample{
337			Timestamp: s.T,
338			Value:     s.V,
339		})
340	}
341	c.wg.Add(len(ss))
342}
343
344func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) {
345	c.wg.Wait()
346	c.mtx.Lock()
347	defer c.mtx.Unlock()
348	for ts, expectedSamples := range c.expectedSamples {
349		if !reflect.DeepEqual(expectedSamples, c.receivedSamples[ts]) {
350			tb.Fatalf("%s: Expected %v, got %v", ts, expectedSamples, c.receivedSamples[ts])
351		}
352	}
353}
354
355func (c *TestStorageClient) expectSampleCount(ss []tsdb.RefSample) {
356	c.mtx.Lock()
357	defer c.mtx.Unlock()
358	c.wg.Add(len(ss))
359}
360
361func (c *TestStorageClient) waitForExpectedSampleCount() {
362	c.wg.Wait()
363}
364
365func (c *TestStorageClient) Store(_ context.Context, req []byte) error {
366	c.mtx.Lock()
367	defer c.mtx.Unlock()
368	// nil buffers are ok for snappy, ignore cast error.
369	if c.buf != nil {
370		c.buf = c.buf[:cap(c.buf)]
371	}
372	reqBuf, err := snappy.Decode(c.buf, req)
373	c.buf = reqBuf
374	if err != nil {
375		return err
376	}
377
378	var reqProto prompb.WriteRequest
379	if err := proto.Unmarshal(reqBuf, &reqProto); err != nil {
380		return err
381	}
382
383	count := 0
384	for _, ts := range reqProto.Timeseries {
385		var seriesName string
386		labels := labelProtosToLabels(ts.Labels)
387		for _, label := range labels {
388			if label.Name == "__name__" {
389				seriesName = label.Value
390			}
391		}
392		for _, sample := range ts.Samples {
393			count++
394			c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
395		}
396	}
397	c.wg.Add(-count)
398	return nil
399}
400
401func (c *TestStorageClient) Name() string {
402	return "teststorageclient"
403}
404
405// TestBlockingStorageClient is a queue_manager StorageClient which will block
406// on any calls to Store(), until the request's Context is cancelled, at which
407// point the `numCalls` property will contain a count of how many times Store()
408// was called.
409type TestBlockingStorageClient struct {
410	numCalls uint64
411}
412
413func NewTestBlockedStorageClient() *TestBlockingStorageClient {
414	return &TestBlockingStorageClient{}
415}
416
417func (c *TestBlockingStorageClient) Store(ctx context.Context, _ []byte) error {
418	atomic.AddUint64(&c.numCalls, 1)
419	<-ctx.Done()
420	return nil
421}
422
423func (c *TestBlockingStorageClient) NumCalls() uint64 {
424	return atomic.LoadUint64(&c.numCalls)
425}
426
427func (c *TestBlockingStorageClient) Name() string {
428	return "testblockingstorageclient"
429}
430
431func BenchmarkSampleDelivery(b *testing.B) {
432	// Let's create an even number of send batches so we don't run into the
433	// batch timeout case.
434	n := config.DefaultQueueConfig.MaxSamplesPerSend * 10
435	samples, series := createTimeseries(n)
436
437	c := NewTestStorageClient()
438
439	cfg := config.DefaultQueueConfig
440	cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
441	cfg.MaxShards = 1
442
443	dir, err := ioutil.TempDir("", "BenchmarkSampleDelivery")
444	testutil.Ok(b, err)
445	defer os.RemoveAll(dir)
446
447	m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline)
448	m.StoreSeries(series, 0)
449
450	// These should be received by the client.
451	m.Start()
452	defer m.Stop()
453
454	b.ResetTimer()
455	for i := 0; i < b.N; i++ {
456		c.expectSampleCount(samples)
457		m.Append(samples)
458		c.waitForExpectedSampleCount()
459	}
460	// Do not include shutdown
461	b.StopTimer()
462}
463
464func BenchmarkStartup(b *testing.B) {
465	dir := os.Getenv("WALDIR")
466	if dir == "" {
467		return
468	}
469
470	// Find the second largest segment; we will replay up to this.
471	// (Second largest as WALWatcher will start tailing the largest).
472	dirents, err := ioutil.ReadDir(dir)
473	testutil.Ok(b, err)
474
475	var segments []int
476	for _, dirent := range dirents {
477		if i, err := strconv.Atoi(dirent.Name()); err != nil {
478			segments = append(segments, i)
479		}
480	}
481	sort.Ints(segments)
482
483	logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
484	logger = log.With(logger, "caller", log.DefaultCaller)
485
486	for n := 0; n < b.N; n++ {
487		c := NewTestBlockedStorageClient()
488		m := NewQueueManager(logger, dir,
489			newEWMARate(ewmaWeight, shardUpdateDuration),
490			config.DefaultQueueConfig, nil, nil, c, 1*time.Minute)
491		m.watcher.startTime = math.MaxInt64
492		m.watcher.maxSegment = segments[len(segments)-2]
493		err := m.watcher.run()
494		testutil.Ok(b, err)
495	}
496}
497
498func TestProcessExternalLabels(t *testing.T) {
499	for _, tc := range []struct {
500		labels         tsdbLabels.Labels
501		externalLabels labels.Labels
502		expected       labels.Labels
503	}{
504		// Test adding labels at the end.
505		{
506			labels:         tsdbLabels.Labels{{Name: "a", Value: "b"}},
507			externalLabels: labels.Labels{{Name: "c", Value: "d"}},
508			expected:       labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
509		},
510
511		// Test adding labels at the beginning.
512		{
513			labels:         tsdbLabels.Labels{{Name: "c", Value: "d"}},
514			externalLabels: labels.Labels{{Name: "a", Value: "b"}},
515			expected:       labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
516		},
517
518		// Test we don't override existing labels.
519		{
520			labels:         tsdbLabels.Labels{{Name: "a", Value: "b"}},
521			externalLabels: labels.Labels{{Name: "a", Value: "c"}},
522			expected:       labels.Labels{{Name: "a", Value: "b"}},
523		},
524	} {
525		require.Equal(t, tc.expected, processExternalLabels(tc.labels, tc.externalLabels))
526	}
527}
528