1package ingester
2
3import (
4	"fmt"
5	"io/ioutil"
6	"os"
7	"sort"
8	"sync"
9	"syscall"
10	"testing"
11	"time"
12
13	"github.com/cortexproject/cortex/pkg/ring"
14	"github.com/cortexproject/cortex/pkg/tenant"
15	gokitlog "github.com/go-kit/kit/log"
16	"github.com/grafana/dskit/flagext"
17	"github.com/grafana/dskit/kv"
18	"github.com/grafana/dskit/services"
19	"github.com/prometheus/common/model"
20	"github.com/prometheus/prometheus/pkg/labels"
21	"github.com/stretchr/testify/require"
22	"github.com/weaveworks/common/user"
23	"golang.org/x/net/context"
24
25	"github.com/grafana/loki/pkg/chunkenc"
26	"github.com/grafana/loki/pkg/ingester/client"
27	"github.com/grafana/loki/pkg/iter"
28	"github.com/grafana/loki/pkg/logproto"
29	"github.com/grafana/loki/pkg/logql"
30	"github.com/grafana/loki/pkg/logql/log"
31	"github.com/grafana/loki/pkg/runtime"
32	"github.com/grafana/loki/pkg/storage"
33	"github.com/grafana/loki/pkg/storage/chunk"
34	"github.com/grafana/loki/pkg/validation"
35)
36
37const (
38	numSeries        = 10
39	samplesPerSeries = 100
40)
41
42func TestChunkFlushingIdle(t *testing.T) {
43	cfg := defaultIngesterTestConfig(t)
44	cfg.FlushCheckPeriod = 20 * time.Millisecond
45	cfg.MaxChunkIdle = 100 * time.Millisecond
46	cfg.RetainPeriod = 500 * time.Millisecond
47
48	store, ing := newTestStore(t, cfg, nil)
49	defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
50	testData := pushTestSamples(t, ing)
51
52	// wait beyond idle time so samples flush
53	time.Sleep(cfg.MaxChunkIdle * 2)
54	store.checkData(t, testData)
55}
56
57func TestChunkFlushingShutdown(t *testing.T) {
58	store, ing := newTestStore(t, defaultIngesterTestConfig(t), nil)
59	testData := pushTestSamples(t, ing)
60	require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
61	store.checkData(t, testData)
62}
63
64type fullWAL struct{}
65
66func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} }
67func (fullWAL) Start()                 {}
68func (fullWAL) Stop() error            { return nil }
69
70func Benchmark_FlushLoop(b *testing.B) {
71	var (
72		size   = 5
73		descs  [][]*chunkDesc
74		lbs    = makeRandomLabels()
75		ctx    = user.InjectOrgID(context.Background(), "foo")
76		_, ing = newTestStore(b, defaultIngesterTestConfig(b), nil)
77	)
78
79	for i := 0; i < size; i++ {
80		descs = append(descs, buildChunkDecs(b))
81	}
82
83	b.ResetTimer()
84	b.ReportAllocs()
85
86	for n := 0; n < b.N; n++ {
87		var wg sync.WaitGroup
88		for i := 0; i < size; i++ {
89			wg.Add(1)
90			go func(loop int) {
91				defer wg.Done()
92				require.NoError(b, ing.flushChunks(ctx, 0, lbs, descs[loop], &sync.RWMutex{}))
93			}(i)
94		}
95		wg.Wait()
96	}
97}
98
99func Test_Flush(t *testing.T) {
100	var (
101		store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil)
102		lbs        = makeRandomLabels()
103		ctx        = user.InjectOrgID(context.Background(), "foo")
104	)
105	store.onPut = func(ctx context.Context, chunks []chunk.Chunk) error {
106		for _, c := range chunks {
107			buf, err := c.Encoded()
108			require.Nil(t, err)
109			if err := c.Decode(chunk.NewDecodeContext(), buf); err != nil {
110				return err
111			}
112		}
113		return nil
114	}
115	require.NoError(t, ing.flushChunks(ctx, 0, lbs, buildChunkDecs(t), &sync.RWMutex{}))
116}
117
118func buildChunkDecs(t testing.TB) []*chunkDesc {
119	res := make([]*chunkDesc, 10)
120	for i := range res {
121		res[i] = &chunkDesc{
122			closed: true,
123			chunk:  chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize),
124		}
125		fillChunk(t, res[i].chunk)
126		require.NoError(t, res[i].chunk.Close())
127	}
128	return res
129}
130
131func TestWALFullFlush(t *testing.T) {
132	// technically replaced with a fake wal, but the ingester New() function creates a regular wal first,
133	// so we enable creation/cleanup even though it remains unused.
134	walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
135	require.Nil(t, err)
136	defer os.RemoveAll(walDir)
137
138	store, ing := newTestStore(t, defaultIngesterTestConfigWithWAL(t, walDir), fullWAL{})
139	testData := pushTestSamples(t, ing)
140	require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
141	store.checkData(t, testData)
142}
143
144func TestFlushingCollidingLabels(t *testing.T) {
145	cfg := defaultIngesterTestConfig(t)
146	cfg.FlushCheckPeriod = 20 * time.Millisecond
147	cfg.MaxChunkIdle = 100 * time.Millisecond
148	cfg.RetainPeriod = 500 * time.Millisecond
149
150	store, ing := newTestStore(t, cfg, nil)
151	defer store.Stop()
152
153	const userID = "testUser"
154	ctx := user.InjectOrgID(context.Background(), userID)
155
156	// checkData only iterates between unix seconds 0 and 1000
157	now := time.Unix(0, 0)
158
159	req := &logproto.PushRequest{Streams: []logproto.Stream{
160		// some colliding label sets
161		{Labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}.String(), Entries: entries(5, now.Add(time.Minute))},
162		{Labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}.String(), Entries: entries(5, now)},
163		{Labels: model.LabelSet{"app": "l", "uniq0": "1", "uniq1": "0"}.String(), Entries: entries(5, now.Add(time.Minute))},
164		{Labels: model.LabelSet{"app": "m", "uniq0": "0", "uniq1": "0"}.String(), Entries: entries(5, now)},
165		{Labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "0"}.String(), Entries: entries(5, now.Add(time.Minute))},
166		{Labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "0"}.String(), Entries: entries(5, now)},
167	}}
168
169	sort.Slice(req.Streams, func(i, j int) bool {
170		return req.Streams[i].Labels < req.Streams[j].Labels
171	})
172
173	_, err := ing.Push(ctx, req)
174	require.NoError(t, err)
175
176	// force flush
177	require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
178
179	// verify that we get all the data back
180	store.checkData(t, map[string][]logproto.Stream{userID: req.Streams})
181
182	// make sure all chunks have different fingerprint, even colliding ones.
183	chunkFingerprints := map[model.Fingerprint]bool{}
184	for _, c := range store.getChunksForUser(userID) {
185		require.False(t, chunkFingerprints[c.Fingerprint])
186		chunkFingerprints[c.Fingerprint] = true
187	}
188}
189
190func TestFlushMaxAge(t *testing.T) {
191	cfg := defaultIngesterTestConfig(t)
192	cfg.FlushCheckPeriod = time.Millisecond * 100
193	cfg.MaxChunkAge = time.Minute
194	cfg.MaxChunkIdle = time.Hour
195
196	store, ing := newTestStore(t, cfg, nil)
197	defer store.Stop()
198
199	now := time.Unix(0, 0)
200
201	firstEntries := []logproto.Entry{
202		{Timestamp: now.Add(time.Nanosecond), Line: "1"},
203		{Timestamp: now.Add(time.Minute), Line: "2"},
204	}
205
206	secondEntries := []logproto.Entry{
207		{Timestamp: now.Add(time.Second * 61), Line: "3"},
208	}
209
210	req := &logproto.PushRequest{Streams: []logproto.Stream{
211		{Labels: model.LabelSet{"app": "l"}.String(), Entries: firstEntries},
212	}}
213
214	const userID = "testUser"
215	ctx := user.InjectOrgID(context.Background(), userID)
216
217	_, err := ing.Push(ctx, req)
218	require.NoError(t, err)
219
220	time.Sleep(2 * cfg.FlushCheckPeriod)
221
222	// ensure chunk is not flushed after flush period elapses
223	store.checkData(t, map[string][]logproto.Stream{})
224
225	req2 := &logproto.PushRequest{Streams: []logproto.Stream{
226		{Labels: model.LabelSet{"app": "l"}.String(), Entries: secondEntries},
227	}}
228
229	_, err = ing.Push(ctx, req2)
230	require.NoError(t, err)
231
232	time.Sleep(2 * cfg.FlushCheckPeriod)
233
234	// assert stream is now both batches
235	store.checkData(t, map[string][]logproto.Stream{
236		userID: {
237			{Labels: model.LabelSet{"app": "l"}.String(), Entries: append(firstEntries, secondEntries...)},
238		},
239	})
240
241	require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
242}
243
244type testStore struct {
245	mtx sync.Mutex
246	// Chunks keyed by userID.
247	chunks map[string][]chunk.Chunk
248	onPut  func(ctx context.Context, chunks []chunk.Chunk) error
249}
250
251// Note: the ingester New() function creates it's own WAL first which we then override if specified.
252// Because of this, ensure any WAL directories exist/are cleaned up even when overriding the wal.
253// This is an ugly hook for testing :(
254func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, *Ingester) {
255	store := &testStore{
256		chunks: map[string][]chunk.Chunk{},
257	}
258
259	limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
260	require.NoError(t, err)
261
262	ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
263	require.NoError(t, err)
264	require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
265
266	if walOverride != nil {
267		_ = ing.wal.Stop()
268		ing.wal = walOverride
269	}
270
271	return store, ing
272}
273
274// nolint
275func defaultIngesterTestConfig(t testing.TB) Config {
276	kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, ring.GetCodec(), nil, gokitlog.NewNopLogger())
277	require.NoError(t, err)
278
279	cfg := Config{}
280	flagext.DefaultValues(&cfg)
281	cfg.FlushCheckPeriod = 99999 * time.Hour
282	cfg.MaxChunkIdle = 99999 * time.Hour
283	cfg.ConcurrentFlushes = 1
284	cfg.LifecyclerConfig.RingConfig.KVStore.Mock = kvClient
285	cfg.LifecyclerConfig.NumTokens = 1
286	cfg.LifecyclerConfig.ListenPort = 0
287	cfg.LifecyclerConfig.Addr = "localhost"
288	cfg.LifecyclerConfig.ID = "localhost"
289	cfg.LifecyclerConfig.FinalSleep = 0
290	cfg.LifecyclerConfig.MinReadyDuration = 0
291	cfg.BlockSize = 256 * 1024
292	cfg.TargetChunkSize = 1500 * 1024
293	return cfg
294}
295
296func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
297	s.mtx.Lock()
298	defer s.mtx.Unlock()
299	if s.onPut != nil {
300		return s.onPut(ctx, chunks)
301	}
302	userID, err := tenant.TenantID(ctx)
303	if err != nil {
304		return err
305	}
306	for ix, chunk := range chunks {
307		for _, label := range chunk.Metric {
308			if label.Value == "" {
309				return fmt.Errorf("Chunk has blank label %q", label.Name)
310			}
311		}
312
313		// remove __name__ label
314		if chunk.Metric.Has("__name__") {
315			labelsBuilder := labels.NewBuilder(chunk.Metric)
316			labelsBuilder.Del("__name__")
317			chunks[ix].Metric = labelsBuilder.Labels()
318		}
319	}
320	s.chunks[userID] = append(s.chunks[userID], chunks...)
321	return nil
322}
323
324func (s *testStore) IsLocal() bool {
325	return false
326}
327
328func (s *testStore) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) {
329	return nil, nil
330}
331
332func (s *testStore) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) {
333	return nil, nil
334}
335
336func (s *testStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
337	return nil, nil, nil
338}
339
340func (s *testStore) GetSchemaConfigs() []chunk.PeriodConfig {
341	return nil
342}
343
344func (s *testStore) Stop() {}
345
346func (s *testStore) SetChunkFilterer(_ storage.RequestChunkFilterer) {}
347
348func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream {
349	userIDs := []string{"1", "2", "3"}
350
351	// Create test samples.
352	testData := map[string][]logproto.Stream{}
353	for i, userID := range userIDs {
354		testData[userID] = buildTestStreams(i)
355	}
356
357	// Append samples.
358	for _, userID := range userIDs {
359		ctx := user.InjectOrgID(context.Background(), userID)
360		_, err := ing.Push(ctx, &logproto.PushRequest{
361			Streams: testData[userID],
362		})
363		require.NoError(t, err)
364	}
365	return testData
366}
367
368func buildTestStreams(offset int) []logproto.Stream {
369	var m []logproto.Stream
370	for i := 0; i < numSeries; i++ {
371		ss := logproto.Stream{
372			Labels: model.Metric{
373				"name":         model.LabelValue(fmt.Sprintf("testmetric_%d", i)),
374				model.JobLabel: "testjob",
375			}.String(),
376		}
377		for j := 0; j < samplesPerSeries; j++ {
378			ss.Entries = append(ss.Entries, logproto.Entry{
379				Timestamp: time.Unix(int64(i+j+offset), 0),
380				Line:      "line",
381			})
382		}
383		m = append(m, ss)
384	}
385
386	sort.Slice(m, func(i, j int) bool {
387		return m[i].Labels < m[j].Labels
388	})
389
390	return m
391}
392
393// check that the store is holding data equivalent to what we expect
394func (s *testStore) checkData(t *testing.T, testData map[string][]logproto.Stream) {
395	for userID, expected := range testData {
396		streams := s.getStreamsForUser(t, userID)
397		require.Equal(t, expected, streams)
398	}
399}
400
401func (s *testStore) getStreamsForUser(t *testing.T, userID string) []logproto.Stream {
402	var streams []logproto.Stream
403	for _, c := range s.getChunksForUser(userID) {
404		lokiChunk := c.Data.(*chunkenc.Facade).LokiChunk()
405		streams = append(streams, buildStreamsFromChunk(t, c.Metric.String(), lokiChunk))
406	}
407	sort.Slice(streams, func(i, j int) bool {
408		return streams[i].Labels < streams[j].Labels
409	})
410	return streams
411}
412
413func (s *testStore) getChunksForUser(userID string) []chunk.Chunk {
414	s.mtx.Lock()
415	defer s.mtx.Unlock()
416
417	return s.chunks[userID]
418}
419
420func buildStreamsFromChunk(t *testing.T, lbs string, chk chunkenc.Chunk) logproto.Stream {
421	it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
422	require.NoError(t, err)
423
424	stream := logproto.Stream{
425		Labels: lbs,
426	}
427	for it.Next() {
428		stream.Entries = append(stream.Entries, it.Entry())
429	}
430	require.NoError(t, it.Error())
431	return stream
432}
433