1package storage
2
3import (
4	"context"
5	"io/ioutil"
6	"log"
7	"net/http"
8	_ "net/http/pprof"
9	"os"
10	"path"
11	"runtime"
12	"testing"
13	"time"
14
15	util_log "github.com/cortexproject/cortex/pkg/util/log"
16
17	"github.com/stretchr/testify/assert"
18
19	"github.com/cespare/xxhash/v2"
20	"github.com/prometheus/common/model"
21	"github.com/prometheus/prometheus/pkg/labels"
22	"github.com/stretchr/testify/require"
23	"github.com/weaveworks/common/user"
24
25	"github.com/cortexproject/cortex/pkg/querier/astmapper"
26	"github.com/grafana/dskit/flagext"
27
28	"github.com/grafana/loki/pkg/iter"
29	"github.com/grafana/loki/pkg/logproto"
30	"github.com/grafana/loki/pkg/logql"
31	"github.com/grafana/loki/pkg/storage/chunk"
32	chunk_local "github.com/grafana/loki/pkg/storage/chunk/local"
33	"github.com/grafana/loki/pkg/storage/chunk/storage"
34	"github.com/grafana/loki/pkg/storage/stores/shipper"
35	"github.com/grafana/loki/pkg/util/marshal"
36	"github.com/grafana/loki/pkg/validation"
37)
38
39var (
40	start      = model.Time(1523750400000)
41	m          runtime.MemStats
42	ctx        = user.InjectOrgID(context.Background(), "fake")
43	chunkStore = getLocalStore()
44)
45
46// go test -bench=. -benchmem -memprofile memprofile.out -cpuprofile profile.out
47func Benchmark_store_SelectLogsRegexBackward(b *testing.B) {
48	benchmarkStoreQuery(b, &logproto.QueryRequest{
49		Selector:  `{foo="bar"} |~ "fuzz"`,
50		Limit:     1000,
51		Start:     time.Unix(0, start.UnixNano()),
52		End:       time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()),
53		Direction: logproto.BACKWARD,
54	})
55}
56
57func Benchmark_store_SelectLogsLogQLBackward(b *testing.B) {
58	benchmarkStoreQuery(b, &logproto.QueryRequest{
59		Selector:  `{foo="bar"} |= "test" != "toto" |= "fuzz"`,
60		Limit:     1000,
61		Start:     time.Unix(0, start.UnixNano()),
62		End:       time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()),
63		Direction: logproto.BACKWARD,
64	})
65}
66
67func Benchmark_store_SelectLogsRegexForward(b *testing.B) {
68	benchmarkStoreQuery(b, &logproto.QueryRequest{
69		Selector:  `{foo="bar"} |~ "fuzz"`,
70		Limit:     1000,
71		Start:     time.Unix(0, start.UnixNano()),
72		End:       time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()),
73		Direction: logproto.FORWARD,
74	})
75}
76
77func Benchmark_store_SelectLogsForward(b *testing.B) {
78	benchmarkStoreQuery(b, &logproto.QueryRequest{
79		Selector:  `{foo="bar"}`,
80		Limit:     1000,
81		Start:     time.Unix(0, start.UnixNano()),
82		End:       time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()),
83		Direction: logproto.FORWARD,
84	})
85}
86
87func Benchmark_store_SelectLogsBackward(b *testing.B) {
88	benchmarkStoreQuery(b, &logproto.QueryRequest{
89		Selector:  `{foo="bar"}`,
90		Limit:     1000,
91		Start:     time.Unix(0, start.UnixNano()),
92		End:       time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()),
93		Direction: logproto.BACKWARD,
94	})
95}
96
97// rm -Rf /tmp/benchmark/chunks/ /tmp/benchmark/index
98// go run  -mod=vendor ./pkg/storage/hack/main.go
99// go test -benchmem -run=^$ -mod=vendor  ./pkg/storage -bench=Benchmark_store_SelectSample   -memprofile memprofile.out -cpuprofile cpuprofile.out
100func Benchmark_store_SelectSample(b *testing.B) {
101	var sampleRes []logproto.Sample
102	for _, test := range []string{
103		`count_over_time({foo="bar"}[5m])`,
104		`rate({foo="bar"}[5m])`,
105		`bytes_rate({foo="bar"}[5m])`,
106		`bytes_over_time({foo="bar"}[5m])`,
107	} {
108		b.Run(test, func(b *testing.B) {
109			for i := 0; i < b.N; i++ {
110				iter, err := chunkStore.SelectSamples(ctx, logql.SelectSampleParams{
111					SampleQueryRequest: newSampleQuery(test, time.Unix(0, start.UnixNano()), time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano())),
112				})
113				if err != nil {
114					b.Fatal(err)
115				}
116
117				for iter.Next() {
118					sampleRes = append(sampleRes, iter.Sample())
119				}
120				iter.Close()
121			}
122		})
123	}
124	log.Print("sample processed ", len(sampleRes))
125}
126
127func benchmarkStoreQuery(b *testing.B, query *logproto.QueryRequest) {
128	b.ReportAllocs()
129	// force to run gc 10x more often this can be useful to detect fast allocation vs leak.
130	// debug.SetGCPercent(10)
131	stop := make(chan struct{})
132	go func() {
133		_ = http.ListenAndServe(":6060", http.DefaultServeMux)
134	}()
135	go func() {
136		ticker := time.NewTicker(time.Millisecond)
137		for {
138			select {
139			case <-ticker.C:
140				// print and capture the max in use heap size
141				printHeap(b, false)
142			case <-stop:
143				ticker.Stop()
144				return
145			}
146		}
147	}()
148	for i := 0; i < b.N; i++ {
149		iter, err := chunkStore.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: query})
150		if err != nil {
151			b.Fatal(err)
152		}
153		res := []logproto.Entry{}
154		printHeap(b, true)
155		j := uint32(0)
156		for iter.Next() {
157			j++
158			printHeap(b, false)
159			res = append(res, iter.Entry())
160			// limit result like the querier would do.
161			if j == query.Limit {
162				break
163			}
164		}
165		iter.Close()
166		printHeap(b, true)
167		log.Println("line fetched", len(res))
168	}
169	close(stop)
170}
171
172var maxHeapInuse uint64
173
174func printHeap(b *testing.B, show bool) {
175	runtime.ReadMemStats(&m)
176	if m.HeapInuse > maxHeapInuse {
177		maxHeapInuse = m.HeapInuse
178	}
179	if show {
180		log.Printf("Benchmark %d maxHeapInuse: %d Mbytes\n", b.N, maxHeapInuse/1024/1024)
181		log.Printf("Benchmark %d currentHeapInuse: %d Mbytes\n", b.N, m.HeapInuse/1024/1024)
182	}
183}
184
185func getLocalStore() Store {
186	limits, err := validation.NewOverrides(validation.Limits{
187		MaxQueryLength: model.Duration(6000 * time.Hour),
188	}, nil)
189	if err != nil {
190		panic(err)
191	}
192
193	storeConfig := Config{
194		Config: storage.Config{
195			BoltDBConfig: chunk_local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
196			FSConfig:     chunk_local.FSConfig{Directory: "/tmp/benchmark/chunks"},
197		},
198		MaxChunkBatchSize: 10,
199	}
200
201	schemaConfig := SchemaConfig{
202		chunk.SchemaConfig{
203			Configs: []chunk.PeriodConfig{
204				{
205					From:       chunk.DayTime{Time: start},
206					IndexType:  "boltdb",
207					ObjectType: "filesystem",
208					Schema:     "v9",
209					IndexTables: chunk.PeriodicTableConfig{
210						Prefix: "index_",
211						Period: time.Hour * 168,
212					},
213				},
214			},
215		},
216	}
217
218	chunkStore, err := storage.NewStore(
219		storeConfig.Config,
220		chunk.StoreConfig{},
221		schemaConfig.SchemaConfig, limits, nil, nil, util_log.Logger)
222	if err != nil {
223		panic(err)
224	}
225
226	store, err := NewStore(storeConfig, schemaConfig, chunkStore, nil)
227	if err != nil {
228		panic(err)
229	}
230	return store
231}
232
233func Test_store_SelectLogs(t *testing.T) {
234	tests := []struct {
235		name     string
236		req      *logproto.QueryRequest
237		expected []logproto.Stream
238	}{
239		{
240			"all",
241			newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil),
242			[]logproto.Stream{
243				{
244					Labels: "{foo=\"bar\"}",
245					Entries: []logproto.Entry{
246						{
247							Timestamp: from,
248							Line:      "1",
249						},
250
251						{
252							Timestamp: from.Add(time.Millisecond),
253							Line:      "2",
254						},
255						{
256							Timestamp: from.Add(2 * time.Millisecond),
257							Line:      "3",
258						},
259						{
260							Timestamp: from.Add(3 * time.Millisecond),
261							Line:      "4",
262						},
263
264						{
265							Timestamp: from.Add(4 * time.Millisecond),
266							Line:      "5",
267						},
268						{
269							Timestamp: from.Add(5 * time.Millisecond),
270							Line:      "6",
271						},
272					},
273				},
274				{
275					Labels: "{foo=\"bazz\"}",
276					Entries: []logproto.Entry{
277						{
278							Timestamp: from,
279							Line:      "1",
280						},
281
282						{
283							Timestamp: from.Add(time.Millisecond),
284							Line:      "2",
285						},
286						{
287							Timestamp: from.Add(2 * time.Millisecond),
288							Line:      "3",
289						},
290						{
291							Timestamp: from.Add(3 * time.Millisecond),
292							Line:      "4",
293						},
294
295						{
296							Timestamp: from.Add(4 * time.Millisecond),
297							Line:      "5",
298						},
299						{
300							Timestamp: from.Add(5 * time.Millisecond),
301							Line:      "6",
302						},
303					},
304				},
305			},
306		},
307		{
308			"filter regex",
309			newQuery("{foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"", from, from.Add(6*time.Millisecond), nil),
310			[]logproto.Stream{
311				{
312					Labels: "{foo=\"bar\"}",
313					Entries: []logproto.Entry{
314						{
315							Timestamp: from,
316							Line:      "1",
317						},
318					},
319				},
320				{
321					Labels: "{foo=\"bazz\"}",
322					Entries: []logproto.Entry{
323						{
324							Timestamp: from,
325							Line:      "1",
326						},
327					},
328				},
329			},
330		},
331		{
332			"filter matcher",
333			newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil),
334			[]logproto.Stream{
335				{
336					Labels: "{foo=\"bar\"}",
337					Entries: []logproto.Entry{
338						{
339							Timestamp: from,
340							Line:      "1",
341						},
342
343						{
344							Timestamp: from.Add(time.Millisecond),
345							Line:      "2",
346						},
347						{
348							Timestamp: from.Add(2 * time.Millisecond),
349							Line:      "3",
350						},
351						{
352							Timestamp: from.Add(3 * time.Millisecond),
353							Line:      "4",
354						},
355
356						{
357							Timestamp: from.Add(4 * time.Millisecond),
358							Line:      "5",
359						},
360						{
361							Timestamp: from.Add(5 * time.Millisecond),
362							Line:      "6",
363						},
364					},
365				},
366			},
367		},
368		{
369			"filter time",
370			newQuery("{foo=~\"ba.*\"}", from, from.Add(time.Millisecond), nil),
371			[]logproto.Stream{
372				{
373					Labels: "{foo=\"bar\"}",
374					Entries: []logproto.Entry{
375						{
376							Timestamp: from,
377							Line:      "1",
378						},
379					},
380				},
381				{
382					Labels: "{foo=\"bazz\"}",
383					Entries: []logproto.Entry{
384						{
385							Timestamp: from,
386							Line:      "1",
387						},
388					},
389				},
390			},
391		},
392	}
393	for _, tt := range tests {
394		t.Run(tt.name, func(t *testing.T) {
395			s := &store{
396				Store: storeFixture,
397				cfg: Config{
398					MaxChunkBatchSize: 10,
399				},
400				chunkMetrics: NilMetrics,
401			}
402
403			ctx = user.InjectOrgID(context.Background(), "test-user")
404			it, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: tt.req})
405			if err != nil {
406				t.Errorf("store.LazyQuery() error = %v", err)
407				return
408			}
409
410			streams, _, err := iter.ReadBatch(it, tt.req.Limit)
411			_ = it.Close()
412			if err != nil {
413				t.Fatalf("error reading batch %s", err)
414			}
415			assertStream(t, tt.expected, streams.Streams)
416		})
417	}
418}
419
420func Test_store_SelectSample(t *testing.T) {
421	tests := []struct {
422		name     string
423		req      *logproto.SampleQueryRequest
424		expected []logproto.Series
425	}{
426		{
427			"all",
428			newSampleQuery("count_over_time({foo=~\"ba.*\"}[5m])", from, from.Add(6*time.Millisecond)),
429			[]logproto.Series{
430				{
431					Labels: "{foo=\"bar\"}",
432					Samples: []logproto.Sample{
433						{
434							Timestamp: from.UnixNano(),
435							Hash:      xxhash.Sum64String("1"),
436							Value:     1.,
437						},
438
439						{
440							Timestamp: from.Add(time.Millisecond).UnixNano(),
441							Hash:      xxhash.Sum64String("2"),
442							Value:     1.,
443						},
444						{
445							Timestamp: from.Add(2 * time.Millisecond).UnixNano(),
446							Hash:      xxhash.Sum64String("3"),
447							Value:     1.,
448						},
449						{
450							Timestamp: from.Add(3 * time.Millisecond).UnixNano(),
451							Hash:      xxhash.Sum64String("4"),
452							Value:     1.,
453						},
454
455						{
456							Timestamp: from.Add(4 * time.Millisecond).UnixNano(),
457							Hash:      xxhash.Sum64String("5"),
458							Value:     1.,
459						},
460						{
461							Timestamp: from.Add(5 * time.Millisecond).UnixNano(),
462							Hash:      xxhash.Sum64String("6"),
463							Value:     1.,
464						},
465					},
466				},
467				{
468					Labels: "{foo=\"bazz\"}",
469					Samples: []logproto.Sample{
470						{
471							Timestamp: from.UnixNano(),
472							Hash:      xxhash.Sum64String("1"),
473							Value:     1.,
474						},
475
476						{
477							Timestamp: from.Add(time.Millisecond).UnixNano(),
478							Hash:      xxhash.Sum64String("2"),
479							Value:     1.,
480						},
481						{
482							Timestamp: from.Add(2 * time.Millisecond).UnixNano(),
483							Hash:      xxhash.Sum64String("3"),
484							Value:     1.,
485						},
486						{
487							Timestamp: from.Add(3 * time.Millisecond).UnixNano(),
488							Hash:      xxhash.Sum64String("4"),
489							Value:     1.,
490						},
491
492						{
493							Timestamp: from.Add(4 * time.Millisecond).UnixNano(),
494							Hash:      xxhash.Sum64String("5"),
495							Value:     1.,
496						},
497						{
498							Timestamp: from.Add(5 * time.Millisecond).UnixNano(),
499							Hash:      xxhash.Sum64String("6"),
500							Value:     1.,
501						},
502					},
503				},
504			},
505		},
506		{
507			"filter regex",
508			newSampleQuery("rate({foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"[1m])", from, from.Add(6*time.Millisecond)),
509			[]logproto.Series{
510				{
511					Labels: "{foo=\"bar\"}",
512					Samples: []logproto.Sample{
513						{
514							Timestamp: from.UnixNano(),
515							Hash:      xxhash.Sum64String("1"),
516							Value:     1.,
517						},
518					},
519				},
520				{
521					Labels: "{foo=\"bazz\"}",
522					Samples: []logproto.Sample{
523						{
524							Timestamp: from.UnixNano(),
525							Hash:      xxhash.Sum64String("1"),
526							Value:     1.,
527						},
528					},
529				},
530			},
531		},
532		{
533			"filter matcher",
534			newSampleQuery("count_over_time({foo=\"bar\"}[10m])", from, from.Add(6*time.Millisecond)),
535			[]logproto.Series{
536				{
537					Labels: "{foo=\"bar\"}",
538					Samples: []logproto.Sample{
539						{
540							Timestamp: from.UnixNano(),
541							Hash:      xxhash.Sum64String("1"),
542							Value:     1.,
543						},
544
545						{
546							Timestamp: from.Add(time.Millisecond).UnixNano(),
547							Hash:      xxhash.Sum64String("2"),
548							Value:     1.,
549						},
550						{
551							Timestamp: from.Add(2 * time.Millisecond).UnixNano(),
552							Hash:      xxhash.Sum64String("3"),
553							Value:     1.,
554						},
555						{
556							Timestamp: from.Add(3 * time.Millisecond).UnixNano(),
557							Hash:      xxhash.Sum64String("4"),
558							Value:     1.,
559						},
560
561						{
562							Timestamp: from.Add(4 * time.Millisecond).UnixNano(),
563							Hash:      xxhash.Sum64String("5"),
564							Value:     1.,
565						},
566						{
567							Timestamp: from.Add(5 * time.Millisecond).UnixNano(),
568							Hash:      xxhash.Sum64String("6"),
569							Value:     1.,
570						},
571					},
572				},
573			},
574		},
575		{
576			"filter time",
577			newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(time.Millisecond)),
578			[]logproto.Series{
579				{
580					Labels: "{foo=\"bar\"}",
581					Samples: []logproto.Sample{
582						{
583							Timestamp: from.UnixNano(),
584							Hash:      xxhash.Sum64String("1"),
585							Value:     1.,
586						},
587					},
588				},
589				{
590					Labels: "{foo=\"bazz\"}",
591					Samples: []logproto.Sample{
592						{
593							Timestamp: from.UnixNano(),
594							Hash:      xxhash.Sum64String("1"),
595							Value:     1.,
596						},
597					},
598				},
599			},
600		},
601	}
602	for _, tt := range tests {
603		t.Run(tt.name, func(t *testing.T) {
604			s := &store{
605				Store: storeFixture,
606				cfg: Config{
607					MaxChunkBatchSize: 10,
608				},
609				chunkMetrics: NilMetrics,
610			}
611
612			ctx = user.InjectOrgID(context.Background(), "test-user")
613			it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: tt.req})
614			if err != nil {
615				t.Errorf("store.LazyQuery() error = %v", err)
616				return
617			}
618
619			series, _, err := iter.ReadSampleBatch(it, uint32(100000))
620			_ = it.Close()
621			if err != nil {
622				t.Fatalf("error reading batch %s", err)
623			}
624			assertSeries(t, tt.expected, series.Series)
625		})
626	}
627}
628
629type fakeChunkFilterer struct{}
630
631func (f fakeChunkFilterer) ForRequest(ctx context.Context) ChunkFilterer {
632	return f
633}
634
635func (f fakeChunkFilterer) ShouldFilter(metric labels.Labels) bool {
636	return metric.Get("foo") == "bazz"
637}
638
639func Test_ChunkFilterer(t *testing.T) {
640	s := &store{
641		Store: storeFixture,
642		cfg: Config{
643			MaxChunkBatchSize: 10,
644		},
645		chunkMetrics: NilMetrics,
646	}
647	s.SetChunkFilterer(&fakeChunkFilterer{})
648	ctx = user.InjectOrgID(context.Background(), "test-user")
649	it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour))})
650	if err != nil {
651		t.Errorf("store.SelectSamples() error = %v", err)
652		return
653	}
654	defer it.Close()
655	for it.Next() {
656		v := mustParseLabels(it.Labels())["foo"]
657		require.NotEqual(t, "bazz", v)
658	}
659
660	logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil)})
661	if err != nil {
662		t.Errorf("store.SelectLogs() error = %v", err)
663		return
664	}
665	defer logit.Close()
666	for logit.Next() {
667		v := mustParseLabels(it.Labels())["foo"]
668		require.NotEqual(t, "bazz", v)
669	}
670	ids, err := s.GetSeries(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil)})
671	require.NoError(t, err)
672	for _, id := range ids {
673		v := id.Labels["foo"]
674		require.NotEqual(t, "bazz", v)
675	}
676}
677
678func Test_store_GetSeries(t *testing.T) {
679	tests := []struct {
680		name      string
681		req       *logproto.QueryRequest
682		expected  []logproto.SeriesIdentifier
683		batchSize int
684	}{
685		{
686			"all",
687			newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil),
688			[]logproto.SeriesIdentifier{
689				{Labels: mustParseLabels("{foo=\"bar\"}")},
690				{Labels: mustParseLabels("{foo=\"bazz\"}")},
691			},
692			1,
693		},
694		{
695			"all-single-batch",
696			newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil),
697			[]logproto.SeriesIdentifier{
698				{Labels: mustParseLabels("{foo=\"bar\"}")},
699				{Labels: mustParseLabels("{foo=\"bazz\"}")},
700			},
701			5,
702		},
703		{
704			"regexp filter (post chunk fetching)",
705			newQuery("{foo=~\"bar.*\"}", from, from.Add(6*time.Millisecond), nil),
706			[]logproto.SeriesIdentifier{
707				{Labels: mustParseLabels("{foo=\"bar\"}")},
708			},
709			1,
710		},
711		{
712			"filter matcher",
713			newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil),
714			[]logproto.SeriesIdentifier{
715				{Labels: mustParseLabels("{foo=\"bar\"}")},
716			},
717			1,
718		},
719	}
720	for _, tt := range tests {
721		t.Run(tt.name, func(t *testing.T) {
722			s := &store{
723				Store: storeFixture,
724				cfg: Config{
725					MaxChunkBatchSize: tt.batchSize,
726				},
727				chunkMetrics: NilMetrics,
728			}
729			ctx = user.InjectOrgID(context.Background(), "test-user")
730			out, err := s.GetSeries(ctx, logql.SelectLogParams{QueryRequest: tt.req})
731			if err != nil {
732				t.Errorf("store.GetSeries() error = %v", err)
733				return
734			}
735			require.Equal(t, tt.expected, out)
736		})
737	}
738}
739
740func Test_store_decodeReq_Matchers(t *testing.T) {
741	tests := []struct {
742		name     string
743		req      *logproto.QueryRequest
744		matchers []*labels.Matcher
745	}{
746		{
747			"unsharded",
748			newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil),
749			[]*labels.Matcher{
750				labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.*"),
751				labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "logs"),
752			},
753		},
754		{
755			"unsharded",
756			newQuery(
757				"{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond),
758				[]astmapper.ShardAnnotation{
759					{Shard: 1, Of: 2},
760				},
761			),
762			[]*labels.Matcher{
763				labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.*"),
764				labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "logs"),
765				labels.MustNewMatcher(
766					labels.MatchEqual,
767					astmapper.ShardLabel,
768					astmapper.ShardAnnotation{Shard: 1, Of: 2}.String(),
769				),
770			},
771		},
772	}
773	for _, tt := range tests {
774		t.Run(tt.name, func(t *testing.T) {
775			ms, _, _, err := decodeReq(logql.SelectLogParams{QueryRequest: tt.req})
776			if err != nil {
777				t.Errorf("store.GetSeries() error = %v", err)
778				return
779			}
780			require.Equal(t, tt.matchers, ms)
781		})
782	}
783}
784
785type timeRange struct {
786	from, to time.Time
787}
788
789func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
790	tempDir, err := ioutil.TempDir("", "multiple-boltdb-shippers")
791	require.NoError(t, err)
792
793	defer func() {
794		require.NoError(t, os.RemoveAll(tempDir))
795	}()
796
797	limits, err := validation.NewOverrides(validation.Limits{}, nil)
798	require.NoError(t, err)
799
800	// config for BoltDB Shipper
801	boltdbShipperConfig := shipper.Config{}
802	flagext.DefaultValues(&boltdbShipperConfig)
803	boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index")
804	boltdbShipperConfig.SharedStoreType = "filesystem"
805	boltdbShipperConfig.CacheLocation = path.Join(tempDir, "boltdb-shipper-cache")
806
807	// dates for activation of boltdb shippers
808	firstStoreDate := parseDate("2019-01-01")
809	secondStoreDate := parseDate("2019-01-02")
810
811	config := Config{
812		Config: storage.Config{
813			FSConfig: chunk_local.FSConfig{Directory: path.Join(tempDir, "chunks")},
814		},
815		BoltDBShipperConfig: boltdbShipperConfig,
816	}
817
818	schemaConfig := SchemaConfig{
819		chunk.SchemaConfig{
820			Configs: []chunk.PeriodConfig{
821				{
822					From:       chunk.DayTime{Time: timeToModelTime(firstStoreDate)},
823					IndexType:  "boltdb-shipper",
824					ObjectType: "filesystem",
825					Schema:     "v9",
826					IndexTables: chunk.PeriodicTableConfig{
827						Prefix: "index_",
828						Period: time.Hour * 168,
829					},
830				},
831				{
832					From:       chunk.DayTime{Time: timeToModelTime(secondStoreDate)},
833					IndexType:  "boltdb-shipper",
834					ObjectType: "filesystem",
835					Schema:     "v11",
836					IndexTables: chunk.PeriodicTableConfig{
837						Prefix: "index_",
838						Period: time.Hour * 168,
839					},
840					RowShards: 2,
841				},
842			},
843		},
844	}
845
846	RegisterCustomIndexClients(&config, nil)
847
848	chunkStore, err := storage.NewStore(
849		config.Config,
850		chunk.StoreConfig{},
851		schemaConfig.SchemaConfig,
852		limits,
853		nil,
854		nil,
855		util_log.Logger,
856	)
857	require.NoError(t, err)
858	store, err := NewStore(config, schemaConfig, chunkStore, nil)
859	require.NoError(t, err)
860
861	// time ranges adding a chunk for each store and a chunk which overlaps both the stores
862	chunksToBuildForTimeRanges := []timeRange{
863		{
864			// chunk just for first store
865			secondStoreDate.Add(-3 * time.Hour),
866			secondStoreDate.Add(-2 * time.Hour),
867		},
868		{
869			// chunk overlapping both the stores
870			secondStoreDate.Add(-time.Hour),
871			secondStoreDate.Add(time.Hour),
872		},
873		{
874			// chunk just for second store
875			secondStoreDate.Add(2 * time.Hour),
876			secondStoreDate.Add(3 * time.Hour),
877		},
878	}
879
880	// build and add chunks to the store
881	addedChunkIDs := map[string]struct{}{}
882	for _, tr := range chunksToBuildForTimeRanges {
883		chk := newChunk(buildTestStreams(fooLabelsWithName, tr))
884
885		err := store.PutOne(ctx, chk.From, chk.Through, chk)
886		require.NoError(t, err)
887
888		addedChunkIDs[chk.ExternalKey()] = struct{}{}
889	}
890
891	// recreate the store because boltdb-shipper now runs queriers on snapshots which are created every 1 min and during startup.
892	store.Stop()
893
894	chunkStore, err = storage.NewStore(
895		config.Config,
896		chunk.StoreConfig{},
897		schemaConfig.SchemaConfig,
898		limits,
899		nil,
900		nil,
901		util_log.Logger,
902	)
903	require.NoError(t, err)
904
905	store, err = NewStore(config, schemaConfig, chunkStore, nil)
906	require.NoError(t, err)
907
908	defer store.Stop()
909
910	// get all the chunks from both the stores
911	chunks, err := store.Get(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName)...)
912	require.NoError(t, err)
913
914	// we get common chunk twice because it is indexed in both the stores
915	require.Len(t, chunks, len(addedChunkIDs)+1)
916
917	// check whether we got back all the chunks which were added
918	for i := range chunks {
919		_, ok := addedChunkIDs[chunks[i].ExternalKey()]
920		require.True(t, ok)
921	}
922}
923
924func mustParseLabels(s string) map[string]string {
925	l, err := marshal.NewLabelSet(s)
926	if err != nil {
927		log.Fatalf("Failed to parse %s", s)
928	}
929
930	return l
931}
932
933func parseDate(in string) time.Time {
934	t, err := time.Parse("2006-01-02", in)
935	if err != nil {
936		panic(err)
937	}
938	return t
939}
940
941func buildTestStreams(labels string, tr timeRange) logproto.Stream {
942	stream := logproto.Stream{
943		Labels:  labels,
944		Entries: []logproto.Entry{},
945	}
946
947	for from := tr.from; from.Before(tr.to); from = from.Add(time.Second) {
948		stream.Entries = append(stream.Entries, logproto.Entry{
949			Timestamp: from,
950			Line:      from.String(),
951		})
952	}
953
954	return stream
955}
956
957func timeToModelTime(t time.Time) model.Time {
958	return model.TimeFromUnixNano(t.UnixNano())
959}
960
961func TestActiveIndexType(t *testing.T) {
962	var cfg SchemaConfig
963
964	// just one PeriodConfig in the past
965	cfg.Configs = []chunk.PeriodConfig{{
966		From:      chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
967		IndexType: "first",
968	}}
969
970	assert.Equal(t, 0, ActivePeriodConfig(cfg.Configs))
971
972	// add a newer PeriodConfig in the past which should be considered
973	cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
974		From:      chunk.DayTime{Time: model.Now().Add(-12 * time.Hour)},
975		IndexType: "second",
976	})
977	assert.Equal(t, 1, ActivePeriodConfig(cfg.Configs))
978
979	// add a newer PeriodConfig in the future which should not be considered
980	cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
981		From:      chunk.DayTime{Time: model.Now().Add(time.Hour)},
982		IndexType: "third",
983	})
984	assert.Equal(t, 1, ActivePeriodConfig(cfg.Configs))
985}
986
987func TestUsingBoltdbShipper(t *testing.T) {
988	var cfg SchemaConfig
989
990	// just one PeriodConfig in the past using boltdb-shipper
991	cfg.Configs = []chunk.PeriodConfig{{
992		From:      chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
993		IndexType: "boltdb-shipper",
994	}}
995	assert.Equal(t, true, UsingBoltdbShipper(cfg.Configs))
996
997	// just one PeriodConfig in the past not using boltdb-shipper
998	cfg.Configs[0].IndexType = "boltdb"
999	assert.Equal(t, false, UsingBoltdbShipper(cfg.Configs))
1000
1001	// add a newer PeriodConfig in the future using boltdb-shipper
1002	cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
1003		From:      chunk.DayTime{Time: model.Now().Add(time.Hour)},
1004		IndexType: "boltdb-shipper",
1005	})
1006	assert.Equal(t, true, UsingBoltdbShipper(cfg.Configs))
1007}
1008
1009func TestSchemaConfig_Validate(t *testing.T) {
1010	for _, tc := range []struct {
1011		name    string
1012		configs []chunk.PeriodConfig
1013		err     error
1014	}{
1015		{
1016			name:    "empty",
1017			configs: []chunk.PeriodConfig{},
1018			err:     errZeroLengthConfig,
1019		},
1020		{
1021			name: "NOT using boltdb-shipper",
1022			configs: []chunk.PeriodConfig{{
1023				From:      chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
1024				IndexType: "boltdb",
1025				Schema:    "v9",
1026				IndexTables: chunk.PeriodicTableConfig{
1027					Period: 7 * 24 * time.Hour,
1028				},
1029			}},
1030		},
1031		{
1032			name: "current config boltdb-shipper with 7 days periodic config, without future index type changes",
1033			configs: []chunk.PeriodConfig{{
1034				From:      chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
1035				IndexType: "boltdb-shipper",
1036				Schema:    "v9",
1037				IndexTables: chunk.PeriodicTableConfig{
1038					Period: 7 * 24 * time.Hour,
1039				},
1040			}},
1041			err: errCurrentBoltdbShipperNon24Hours,
1042		},
1043		{
1044			name: "current config boltdb-shipper with 1 day periodic config, without future index type changes",
1045			configs: []chunk.PeriodConfig{{
1046				From:      chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
1047				IndexType: "boltdb-shipper",
1048				Schema:    "v9",
1049				IndexTables: chunk.PeriodicTableConfig{
1050					Period: 24 * time.Hour,
1051				},
1052			}},
1053		},
1054		{
1055			name: "current config boltdb-shipper with 7 days periodic config, upcoming config NOT boltdb-shipper",
1056			configs: []chunk.PeriodConfig{{
1057				From:      chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
1058				IndexType: "boltdb-shipper",
1059				Schema:    "v9",
1060				IndexTables: chunk.PeriodicTableConfig{
1061					Period: 24 * time.Hour,
1062				},
1063			}, {
1064				From:      chunk.DayTime{Time: model.Now().Add(time.Hour)},
1065				IndexType: "boltdb",
1066				Schema:    "v9",
1067				IndexTables: chunk.PeriodicTableConfig{
1068					Period: 7 * 24 * time.Hour,
1069				},
1070			}},
1071		},
1072		{
1073			name: "current and upcoming config boltdb-shipper with 7 days periodic config",
1074			configs: []chunk.PeriodConfig{{
1075				From:      chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
1076				IndexType: "boltdb-shipper",
1077				Schema:    "v9",
1078				IndexTables: chunk.PeriodicTableConfig{
1079					Period: 24 * time.Hour,
1080				},
1081			}, {
1082				From:      chunk.DayTime{Time: model.Now().Add(time.Hour)},
1083				IndexType: "boltdb-shipper",
1084				Schema:    "v9",
1085				IndexTables: chunk.PeriodicTableConfig{
1086					Period: 7 * 24 * time.Hour,
1087				},
1088			}},
1089			err: errUpcomingBoltdbShipperNon24Hours,
1090		},
1091		{
1092			name: "current config NOT boltdb-shipper, upcoming config boltdb-shipper with 7 days periodic config",
1093			configs: []chunk.PeriodConfig{{
1094				From:      chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
1095				IndexType: "boltdb",
1096				Schema:    "v9",
1097				IndexTables: chunk.PeriodicTableConfig{
1098					Period: 24 * time.Hour,
1099				},
1100			}, {
1101				From:      chunk.DayTime{Time: model.Now().Add(time.Hour)},
1102				IndexType: "boltdb-shipper",
1103				Schema:    "v9",
1104				IndexTables: chunk.PeriodicTableConfig{
1105					Period: 7 * 24 * time.Hour,
1106				},
1107			}},
1108			err: errUpcomingBoltdbShipperNon24Hours,
1109		},
1110	} {
1111		t.Run(tc.name, func(t *testing.T) {
1112			cfg := SchemaConfig{chunk.SchemaConfig{Configs: tc.configs}}
1113			err := cfg.Validate()
1114			if tc.err == nil {
1115				require.NoError(t, err)
1116			} else {
1117				require.EqualError(t, err, tc.err.Error())
1118			}
1119		})
1120	}
1121}
1122
1123func Test_OverlappingChunks(t *testing.T) {
1124	chunks := []chunk.Chunk{
1125
1126		newChunk(logproto.Stream{
1127			Labels: `{foo="bar"}`,
1128			Entries: []logproto.Entry{
1129				{Timestamp: time.Unix(0, 1), Line: "1"},
1130				{Timestamp: time.Unix(0, 4), Line: "4"},
1131			},
1132		}),
1133		newChunk(logproto.Stream{
1134			Labels: `{foo="bar"}`,
1135			Entries: []logproto.Entry{
1136				{Timestamp: time.Unix(0, 2), Line: "2"},
1137				{Timestamp: time.Unix(0, 3), Line: "3"},
1138			},
1139		}),
1140	}
1141	s := &store{
1142		Store: &mockChunkStore{chunks: chunks, client: &mockChunkStoreClient{chunks: chunks}},
1143		cfg: Config{
1144			MaxChunkBatchSize: 10,
1145		},
1146		chunkMetrics: NilMetrics,
1147	}
1148
1149	ctx = user.InjectOrgID(context.Background(), "test-user")
1150	it, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: &logproto.QueryRequest{
1151		Selector:  `{foo="bar"}`,
1152		Limit:     1000,
1153		Direction: logproto.BACKWARD,
1154		Start:     time.Unix(0, 0),
1155		End:       time.Unix(0, 10),
1156	}})
1157	if err != nil {
1158		t.Errorf("store.SelectLogs() error = %v", err)
1159		return
1160	}
1161	defer it.Close()
1162	require.True(t, it.Next())
1163	require.Equal(t, "4", it.Entry().Line)
1164	require.True(t, it.Next())
1165	require.Equal(t, "3", it.Entry().Line)
1166	require.True(t, it.Next())
1167	require.Equal(t, "2", it.Entry().Line)
1168	require.True(t, it.Next())
1169	require.Equal(t, "1", it.Entry().Line)
1170	require.False(t, it.Next())
1171}
1172