1package storage 2 3import ( 4 "context" 5 "io/ioutil" 6 "log" 7 "net/http" 8 _ "net/http/pprof" 9 "os" 10 "path" 11 "runtime" 12 "testing" 13 "time" 14 15 util_log "github.com/cortexproject/cortex/pkg/util/log" 16 17 "github.com/stretchr/testify/assert" 18 19 "github.com/cespare/xxhash/v2" 20 "github.com/prometheus/common/model" 21 "github.com/prometheus/prometheus/pkg/labels" 22 "github.com/stretchr/testify/require" 23 "github.com/weaveworks/common/user" 24 25 "github.com/cortexproject/cortex/pkg/querier/astmapper" 26 "github.com/grafana/dskit/flagext" 27 28 "github.com/grafana/loki/pkg/iter" 29 "github.com/grafana/loki/pkg/logproto" 30 "github.com/grafana/loki/pkg/logql" 31 "github.com/grafana/loki/pkg/storage/chunk" 32 chunk_local "github.com/grafana/loki/pkg/storage/chunk/local" 33 "github.com/grafana/loki/pkg/storage/chunk/storage" 34 "github.com/grafana/loki/pkg/storage/stores/shipper" 35 "github.com/grafana/loki/pkg/util/marshal" 36 "github.com/grafana/loki/pkg/validation" 37) 38 39var ( 40 start = model.Time(1523750400000) 41 m runtime.MemStats 42 ctx = user.InjectOrgID(context.Background(), "fake") 43 chunkStore = getLocalStore() 44) 45 46// go test -bench=. -benchmem -memprofile memprofile.out -cpuprofile profile.out 47func Benchmark_store_SelectLogsRegexBackward(b *testing.B) { 48 benchmarkStoreQuery(b, &logproto.QueryRequest{ 49 Selector: `{foo="bar"} |~ "fuzz"`, 50 Limit: 1000, 51 Start: time.Unix(0, start.UnixNano()), 52 End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), 53 Direction: logproto.BACKWARD, 54 }) 55} 56 57func Benchmark_store_SelectLogsLogQLBackward(b *testing.B) { 58 benchmarkStoreQuery(b, &logproto.QueryRequest{ 59 Selector: `{foo="bar"} |= "test" != "toto" |= "fuzz"`, 60 Limit: 1000, 61 Start: time.Unix(0, start.UnixNano()), 62 End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), 63 Direction: logproto.BACKWARD, 64 }) 65} 66 67func Benchmark_store_SelectLogsRegexForward(b *testing.B) { 68 benchmarkStoreQuery(b, &logproto.QueryRequest{ 69 Selector: `{foo="bar"} |~ "fuzz"`, 70 Limit: 1000, 71 Start: time.Unix(0, start.UnixNano()), 72 End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), 73 Direction: logproto.FORWARD, 74 }) 75} 76 77func Benchmark_store_SelectLogsForward(b *testing.B) { 78 benchmarkStoreQuery(b, &logproto.QueryRequest{ 79 Selector: `{foo="bar"}`, 80 Limit: 1000, 81 Start: time.Unix(0, start.UnixNano()), 82 End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), 83 Direction: logproto.FORWARD, 84 }) 85} 86 87func Benchmark_store_SelectLogsBackward(b *testing.B) { 88 benchmarkStoreQuery(b, &logproto.QueryRequest{ 89 Selector: `{foo="bar"}`, 90 Limit: 1000, 91 Start: time.Unix(0, start.UnixNano()), 92 End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), 93 Direction: logproto.BACKWARD, 94 }) 95} 96 97// rm -Rf /tmp/benchmark/chunks/ /tmp/benchmark/index 98// go run -mod=vendor ./pkg/storage/hack/main.go 99// go test -benchmem -run=^$ -mod=vendor ./pkg/storage -bench=Benchmark_store_SelectSample -memprofile memprofile.out -cpuprofile cpuprofile.out 100func Benchmark_store_SelectSample(b *testing.B) { 101 var sampleRes []logproto.Sample 102 for _, test := range []string{ 103 `count_over_time({foo="bar"}[5m])`, 104 `rate({foo="bar"}[5m])`, 105 `bytes_rate({foo="bar"}[5m])`, 106 `bytes_over_time({foo="bar"}[5m])`, 107 } { 108 b.Run(test, func(b *testing.B) { 109 for i := 0; i < b.N; i++ { 110 iter, err := chunkStore.SelectSamples(ctx, logql.SelectSampleParams{ 111 SampleQueryRequest: newSampleQuery(test, time.Unix(0, start.UnixNano()), time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano())), 112 }) 113 if err != nil { 114 b.Fatal(err) 115 } 116 117 for iter.Next() { 118 sampleRes = append(sampleRes, iter.Sample()) 119 } 120 iter.Close() 121 } 122 }) 123 } 124 log.Print("sample processed ", len(sampleRes)) 125} 126 127func benchmarkStoreQuery(b *testing.B, query *logproto.QueryRequest) { 128 b.ReportAllocs() 129 // force to run gc 10x more often this can be useful to detect fast allocation vs leak. 130 // debug.SetGCPercent(10) 131 stop := make(chan struct{}) 132 go func() { 133 _ = http.ListenAndServe(":6060", http.DefaultServeMux) 134 }() 135 go func() { 136 ticker := time.NewTicker(time.Millisecond) 137 for { 138 select { 139 case <-ticker.C: 140 // print and capture the max in use heap size 141 printHeap(b, false) 142 case <-stop: 143 ticker.Stop() 144 return 145 } 146 } 147 }() 148 for i := 0; i < b.N; i++ { 149 iter, err := chunkStore.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: query}) 150 if err != nil { 151 b.Fatal(err) 152 } 153 res := []logproto.Entry{} 154 printHeap(b, true) 155 j := uint32(0) 156 for iter.Next() { 157 j++ 158 printHeap(b, false) 159 res = append(res, iter.Entry()) 160 // limit result like the querier would do. 161 if j == query.Limit { 162 break 163 } 164 } 165 iter.Close() 166 printHeap(b, true) 167 log.Println("line fetched", len(res)) 168 } 169 close(stop) 170} 171 172var maxHeapInuse uint64 173 174func printHeap(b *testing.B, show bool) { 175 runtime.ReadMemStats(&m) 176 if m.HeapInuse > maxHeapInuse { 177 maxHeapInuse = m.HeapInuse 178 } 179 if show { 180 log.Printf("Benchmark %d maxHeapInuse: %d Mbytes\n", b.N, maxHeapInuse/1024/1024) 181 log.Printf("Benchmark %d currentHeapInuse: %d Mbytes\n", b.N, m.HeapInuse/1024/1024) 182 } 183} 184 185func getLocalStore() Store { 186 limits, err := validation.NewOverrides(validation.Limits{ 187 MaxQueryLength: model.Duration(6000 * time.Hour), 188 }, nil) 189 if err != nil { 190 panic(err) 191 } 192 193 storeConfig := Config{ 194 Config: storage.Config{ 195 BoltDBConfig: chunk_local.BoltDBConfig{Directory: "/tmp/benchmark/index"}, 196 FSConfig: chunk_local.FSConfig{Directory: "/tmp/benchmark/chunks"}, 197 }, 198 MaxChunkBatchSize: 10, 199 } 200 201 schemaConfig := SchemaConfig{ 202 chunk.SchemaConfig{ 203 Configs: []chunk.PeriodConfig{ 204 { 205 From: chunk.DayTime{Time: start}, 206 IndexType: "boltdb", 207 ObjectType: "filesystem", 208 Schema: "v9", 209 IndexTables: chunk.PeriodicTableConfig{ 210 Prefix: "index_", 211 Period: time.Hour * 168, 212 }, 213 }, 214 }, 215 }, 216 } 217 218 chunkStore, err := storage.NewStore( 219 storeConfig.Config, 220 chunk.StoreConfig{}, 221 schemaConfig.SchemaConfig, limits, nil, nil, util_log.Logger) 222 if err != nil { 223 panic(err) 224 } 225 226 store, err := NewStore(storeConfig, schemaConfig, chunkStore, nil) 227 if err != nil { 228 panic(err) 229 } 230 return store 231} 232 233func Test_store_SelectLogs(t *testing.T) { 234 tests := []struct { 235 name string 236 req *logproto.QueryRequest 237 expected []logproto.Stream 238 }{ 239 { 240 "all", 241 newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), 242 []logproto.Stream{ 243 { 244 Labels: "{foo=\"bar\"}", 245 Entries: []logproto.Entry{ 246 { 247 Timestamp: from, 248 Line: "1", 249 }, 250 251 { 252 Timestamp: from.Add(time.Millisecond), 253 Line: "2", 254 }, 255 { 256 Timestamp: from.Add(2 * time.Millisecond), 257 Line: "3", 258 }, 259 { 260 Timestamp: from.Add(3 * time.Millisecond), 261 Line: "4", 262 }, 263 264 { 265 Timestamp: from.Add(4 * time.Millisecond), 266 Line: "5", 267 }, 268 { 269 Timestamp: from.Add(5 * time.Millisecond), 270 Line: "6", 271 }, 272 }, 273 }, 274 { 275 Labels: "{foo=\"bazz\"}", 276 Entries: []logproto.Entry{ 277 { 278 Timestamp: from, 279 Line: "1", 280 }, 281 282 { 283 Timestamp: from.Add(time.Millisecond), 284 Line: "2", 285 }, 286 { 287 Timestamp: from.Add(2 * time.Millisecond), 288 Line: "3", 289 }, 290 { 291 Timestamp: from.Add(3 * time.Millisecond), 292 Line: "4", 293 }, 294 295 { 296 Timestamp: from.Add(4 * time.Millisecond), 297 Line: "5", 298 }, 299 { 300 Timestamp: from.Add(5 * time.Millisecond), 301 Line: "6", 302 }, 303 }, 304 }, 305 }, 306 }, 307 { 308 "filter regex", 309 newQuery("{foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"", from, from.Add(6*time.Millisecond), nil), 310 []logproto.Stream{ 311 { 312 Labels: "{foo=\"bar\"}", 313 Entries: []logproto.Entry{ 314 { 315 Timestamp: from, 316 Line: "1", 317 }, 318 }, 319 }, 320 { 321 Labels: "{foo=\"bazz\"}", 322 Entries: []logproto.Entry{ 323 { 324 Timestamp: from, 325 Line: "1", 326 }, 327 }, 328 }, 329 }, 330 }, 331 { 332 "filter matcher", 333 newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil), 334 []logproto.Stream{ 335 { 336 Labels: "{foo=\"bar\"}", 337 Entries: []logproto.Entry{ 338 { 339 Timestamp: from, 340 Line: "1", 341 }, 342 343 { 344 Timestamp: from.Add(time.Millisecond), 345 Line: "2", 346 }, 347 { 348 Timestamp: from.Add(2 * time.Millisecond), 349 Line: "3", 350 }, 351 { 352 Timestamp: from.Add(3 * time.Millisecond), 353 Line: "4", 354 }, 355 356 { 357 Timestamp: from.Add(4 * time.Millisecond), 358 Line: "5", 359 }, 360 { 361 Timestamp: from.Add(5 * time.Millisecond), 362 Line: "6", 363 }, 364 }, 365 }, 366 }, 367 }, 368 { 369 "filter time", 370 newQuery("{foo=~\"ba.*\"}", from, from.Add(time.Millisecond), nil), 371 []logproto.Stream{ 372 { 373 Labels: "{foo=\"bar\"}", 374 Entries: []logproto.Entry{ 375 { 376 Timestamp: from, 377 Line: "1", 378 }, 379 }, 380 }, 381 { 382 Labels: "{foo=\"bazz\"}", 383 Entries: []logproto.Entry{ 384 { 385 Timestamp: from, 386 Line: "1", 387 }, 388 }, 389 }, 390 }, 391 }, 392 } 393 for _, tt := range tests { 394 t.Run(tt.name, func(t *testing.T) { 395 s := &store{ 396 Store: storeFixture, 397 cfg: Config{ 398 MaxChunkBatchSize: 10, 399 }, 400 chunkMetrics: NilMetrics, 401 } 402 403 ctx = user.InjectOrgID(context.Background(), "test-user") 404 it, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: tt.req}) 405 if err != nil { 406 t.Errorf("store.LazyQuery() error = %v", err) 407 return 408 } 409 410 streams, _, err := iter.ReadBatch(it, tt.req.Limit) 411 _ = it.Close() 412 if err != nil { 413 t.Fatalf("error reading batch %s", err) 414 } 415 assertStream(t, tt.expected, streams.Streams) 416 }) 417 } 418} 419 420func Test_store_SelectSample(t *testing.T) { 421 tests := []struct { 422 name string 423 req *logproto.SampleQueryRequest 424 expected []logproto.Series 425 }{ 426 { 427 "all", 428 newSampleQuery("count_over_time({foo=~\"ba.*\"}[5m])", from, from.Add(6*time.Millisecond)), 429 []logproto.Series{ 430 { 431 Labels: "{foo=\"bar\"}", 432 Samples: []logproto.Sample{ 433 { 434 Timestamp: from.UnixNano(), 435 Hash: xxhash.Sum64String("1"), 436 Value: 1., 437 }, 438 439 { 440 Timestamp: from.Add(time.Millisecond).UnixNano(), 441 Hash: xxhash.Sum64String("2"), 442 Value: 1., 443 }, 444 { 445 Timestamp: from.Add(2 * time.Millisecond).UnixNano(), 446 Hash: xxhash.Sum64String("3"), 447 Value: 1., 448 }, 449 { 450 Timestamp: from.Add(3 * time.Millisecond).UnixNano(), 451 Hash: xxhash.Sum64String("4"), 452 Value: 1., 453 }, 454 455 { 456 Timestamp: from.Add(4 * time.Millisecond).UnixNano(), 457 Hash: xxhash.Sum64String("5"), 458 Value: 1., 459 }, 460 { 461 Timestamp: from.Add(5 * time.Millisecond).UnixNano(), 462 Hash: xxhash.Sum64String("6"), 463 Value: 1., 464 }, 465 }, 466 }, 467 { 468 Labels: "{foo=\"bazz\"}", 469 Samples: []logproto.Sample{ 470 { 471 Timestamp: from.UnixNano(), 472 Hash: xxhash.Sum64String("1"), 473 Value: 1., 474 }, 475 476 { 477 Timestamp: from.Add(time.Millisecond).UnixNano(), 478 Hash: xxhash.Sum64String("2"), 479 Value: 1., 480 }, 481 { 482 Timestamp: from.Add(2 * time.Millisecond).UnixNano(), 483 Hash: xxhash.Sum64String("3"), 484 Value: 1., 485 }, 486 { 487 Timestamp: from.Add(3 * time.Millisecond).UnixNano(), 488 Hash: xxhash.Sum64String("4"), 489 Value: 1., 490 }, 491 492 { 493 Timestamp: from.Add(4 * time.Millisecond).UnixNano(), 494 Hash: xxhash.Sum64String("5"), 495 Value: 1., 496 }, 497 { 498 Timestamp: from.Add(5 * time.Millisecond).UnixNano(), 499 Hash: xxhash.Sum64String("6"), 500 Value: 1., 501 }, 502 }, 503 }, 504 }, 505 }, 506 { 507 "filter regex", 508 newSampleQuery("rate({foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"[1m])", from, from.Add(6*time.Millisecond)), 509 []logproto.Series{ 510 { 511 Labels: "{foo=\"bar\"}", 512 Samples: []logproto.Sample{ 513 { 514 Timestamp: from.UnixNano(), 515 Hash: xxhash.Sum64String("1"), 516 Value: 1., 517 }, 518 }, 519 }, 520 { 521 Labels: "{foo=\"bazz\"}", 522 Samples: []logproto.Sample{ 523 { 524 Timestamp: from.UnixNano(), 525 Hash: xxhash.Sum64String("1"), 526 Value: 1., 527 }, 528 }, 529 }, 530 }, 531 }, 532 { 533 "filter matcher", 534 newSampleQuery("count_over_time({foo=\"bar\"}[10m])", from, from.Add(6*time.Millisecond)), 535 []logproto.Series{ 536 { 537 Labels: "{foo=\"bar\"}", 538 Samples: []logproto.Sample{ 539 { 540 Timestamp: from.UnixNano(), 541 Hash: xxhash.Sum64String("1"), 542 Value: 1., 543 }, 544 545 { 546 Timestamp: from.Add(time.Millisecond).UnixNano(), 547 Hash: xxhash.Sum64String("2"), 548 Value: 1., 549 }, 550 { 551 Timestamp: from.Add(2 * time.Millisecond).UnixNano(), 552 Hash: xxhash.Sum64String("3"), 553 Value: 1., 554 }, 555 { 556 Timestamp: from.Add(3 * time.Millisecond).UnixNano(), 557 Hash: xxhash.Sum64String("4"), 558 Value: 1., 559 }, 560 561 { 562 Timestamp: from.Add(4 * time.Millisecond).UnixNano(), 563 Hash: xxhash.Sum64String("5"), 564 Value: 1., 565 }, 566 { 567 Timestamp: from.Add(5 * time.Millisecond).UnixNano(), 568 Hash: xxhash.Sum64String("6"), 569 Value: 1., 570 }, 571 }, 572 }, 573 }, 574 }, 575 { 576 "filter time", 577 newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(time.Millisecond)), 578 []logproto.Series{ 579 { 580 Labels: "{foo=\"bar\"}", 581 Samples: []logproto.Sample{ 582 { 583 Timestamp: from.UnixNano(), 584 Hash: xxhash.Sum64String("1"), 585 Value: 1., 586 }, 587 }, 588 }, 589 { 590 Labels: "{foo=\"bazz\"}", 591 Samples: []logproto.Sample{ 592 { 593 Timestamp: from.UnixNano(), 594 Hash: xxhash.Sum64String("1"), 595 Value: 1., 596 }, 597 }, 598 }, 599 }, 600 }, 601 } 602 for _, tt := range tests { 603 t.Run(tt.name, func(t *testing.T) { 604 s := &store{ 605 Store: storeFixture, 606 cfg: Config{ 607 MaxChunkBatchSize: 10, 608 }, 609 chunkMetrics: NilMetrics, 610 } 611 612 ctx = user.InjectOrgID(context.Background(), "test-user") 613 it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: tt.req}) 614 if err != nil { 615 t.Errorf("store.LazyQuery() error = %v", err) 616 return 617 } 618 619 series, _, err := iter.ReadSampleBatch(it, uint32(100000)) 620 _ = it.Close() 621 if err != nil { 622 t.Fatalf("error reading batch %s", err) 623 } 624 assertSeries(t, tt.expected, series.Series) 625 }) 626 } 627} 628 629type fakeChunkFilterer struct{} 630 631func (f fakeChunkFilterer) ForRequest(ctx context.Context) ChunkFilterer { 632 return f 633} 634 635func (f fakeChunkFilterer) ShouldFilter(metric labels.Labels) bool { 636 return metric.Get("foo") == "bazz" 637} 638 639func Test_ChunkFilterer(t *testing.T) { 640 s := &store{ 641 Store: storeFixture, 642 cfg: Config{ 643 MaxChunkBatchSize: 10, 644 }, 645 chunkMetrics: NilMetrics, 646 } 647 s.SetChunkFilterer(&fakeChunkFilterer{}) 648 ctx = user.InjectOrgID(context.Background(), "test-user") 649 it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour))}) 650 if err != nil { 651 t.Errorf("store.SelectSamples() error = %v", err) 652 return 653 } 654 defer it.Close() 655 for it.Next() { 656 v := mustParseLabels(it.Labels())["foo"] 657 require.NotEqual(t, "bazz", v) 658 } 659 660 logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil)}) 661 if err != nil { 662 t.Errorf("store.SelectLogs() error = %v", err) 663 return 664 } 665 defer logit.Close() 666 for logit.Next() { 667 v := mustParseLabels(it.Labels())["foo"] 668 require.NotEqual(t, "bazz", v) 669 } 670 ids, err := s.GetSeries(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil)}) 671 require.NoError(t, err) 672 for _, id := range ids { 673 v := id.Labels["foo"] 674 require.NotEqual(t, "bazz", v) 675 } 676} 677 678func Test_store_GetSeries(t *testing.T) { 679 tests := []struct { 680 name string 681 req *logproto.QueryRequest 682 expected []logproto.SeriesIdentifier 683 batchSize int 684 }{ 685 { 686 "all", 687 newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), 688 []logproto.SeriesIdentifier{ 689 {Labels: mustParseLabels("{foo=\"bar\"}")}, 690 {Labels: mustParseLabels("{foo=\"bazz\"}")}, 691 }, 692 1, 693 }, 694 { 695 "all-single-batch", 696 newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), 697 []logproto.SeriesIdentifier{ 698 {Labels: mustParseLabels("{foo=\"bar\"}")}, 699 {Labels: mustParseLabels("{foo=\"bazz\"}")}, 700 }, 701 5, 702 }, 703 { 704 "regexp filter (post chunk fetching)", 705 newQuery("{foo=~\"bar.*\"}", from, from.Add(6*time.Millisecond), nil), 706 []logproto.SeriesIdentifier{ 707 {Labels: mustParseLabels("{foo=\"bar\"}")}, 708 }, 709 1, 710 }, 711 { 712 "filter matcher", 713 newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil), 714 []logproto.SeriesIdentifier{ 715 {Labels: mustParseLabels("{foo=\"bar\"}")}, 716 }, 717 1, 718 }, 719 } 720 for _, tt := range tests { 721 t.Run(tt.name, func(t *testing.T) { 722 s := &store{ 723 Store: storeFixture, 724 cfg: Config{ 725 MaxChunkBatchSize: tt.batchSize, 726 }, 727 chunkMetrics: NilMetrics, 728 } 729 ctx = user.InjectOrgID(context.Background(), "test-user") 730 out, err := s.GetSeries(ctx, logql.SelectLogParams{QueryRequest: tt.req}) 731 if err != nil { 732 t.Errorf("store.GetSeries() error = %v", err) 733 return 734 } 735 require.Equal(t, tt.expected, out) 736 }) 737 } 738} 739 740func Test_store_decodeReq_Matchers(t *testing.T) { 741 tests := []struct { 742 name string 743 req *logproto.QueryRequest 744 matchers []*labels.Matcher 745 }{ 746 { 747 "unsharded", 748 newQuery("{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), nil), 749 []*labels.Matcher{ 750 labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.*"), 751 labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "logs"), 752 }, 753 }, 754 { 755 "unsharded", 756 newQuery( 757 "{foo=~\"ba.*\"}", from, from.Add(6*time.Millisecond), 758 []astmapper.ShardAnnotation{ 759 {Shard: 1, Of: 2}, 760 }, 761 ), 762 []*labels.Matcher{ 763 labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.*"), 764 labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "logs"), 765 labels.MustNewMatcher( 766 labels.MatchEqual, 767 astmapper.ShardLabel, 768 astmapper.ShardAnnotation{Shard: 1, Of: 2}.String(), 769 ), 770 }, 771 }, 772 } 773 for _, tt := range tests { 774 t.Run(tt.name, func(t *testing.T) { 775 ms, _, _, err := decodeReq(logql.SelectLogParams{QueryRequest: tt.req}) 776 if err != nil { 777 t.Errorf("store.GetSeries() error = %v", err) 778 return 779 } 780 require.Equal(t, tt.matchers, ms) 781 }) 782 } 783} 784 785type timeRange struct { 786 from, to time.Time 787} 788 789func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { 790 tempDir, err := ioutil.TempDir("", "multiple-boltdb-shippers") 791 require.NoError(t, err) 792 793 defer func() { 794 require.NoError(t, os.RemoveAll(tempDir)) 795 }() 796 797 limits, err := validation.NewOverrides(validation.Limits{}, nil) 798 require.NoError(t, err) 799 800 // config for BoltDB Shipper 801 boltdbShipperConfig := shipper.Config{} 802 flagext.DefaultValues(&boltdbShipperConfig) 803 boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index") 804 boltdbShipperConfig.SharedStoreType = "filesystem" 805 boltdbShipperConfig.CacheLocation = path.Join(tempDir, "boltdb-shipper-cache") 806 807 // dates for activation of boltdb shippers 808 firstStoreDate := parseDate("2019-01-01") 809 secondStoreDate := parseDate("2019-01-02") 810 811 config := Config{ 812 Config: storage.Config{ 813 FSConfig: chunk_local.FSConfig{Directory: path.Join(tempDir, "chunks")}, 814 }, 815 BoltDBShipperConfig: boltdbShipperConfig, 816 } 817 818 schemaConfig := SchemaConfig{ 819 chunk.SchemaConfig{ 820 Configs: []chunk.PeriodConfig{ 821 { 822 From: chunk.DayTime{Time: timeToModelTime(firstStoreDate)}, 823 IndexType: "boltdb-shipper", 824 ObjectType: "filesystem", 825 Schema: "v9", 826 IndexTables: chunk.PeriodicTableConfig{ 827 Prefix: "index_", 828 Period: time.Hour * 168, 829 }, 830 }, 831 { 832 From: chunk.DayTime{Time: timeToModelTime(secondStoreDate)}, 833 IndexType: "boltdb-shipper", 834 ObjectType: "filesystem", 835 Schema: "v11", 836 IndexTables: chunk.PeriodicTableConfig{ 837 Prefix: "index_", 838 Period: time.Hour * 168, 839 }, 840 RowShards: 2, 841 }, 842 }, 843 }, 844 } 845 846 RegisterCustomIndexClients(&config, nil) 847 848 chunkStore, err := storage.NewStore( 849 config.Config, 850 chunk.StoreConfig{}, 851 schemaConfig.SchemaConfig, 852 limits, 853 nil, 854 nil, 855 util_log.Logger, 856 ) 857 require.NoError(t, err) 858 store, err := NewStore(config, schemaConfig, chunkStore, nil) 859 require.NoError(t, err) 860 861 // time ranges adding a chunk for each store and a chunk which overlaps both the stores 862 chunksToBuildForTimeRanges := []timeRange{ 863 { 864 // chunk just for first store 865 secondStoreDate.Add(-3 * time.Hour), 866 secondStoreDate.Add(-2 * time.Hour), 867 }, 868 { 869 // chunk overlapping both the stores 870 secondStoreDate.Add(-time.Hour), 871 secondStoreDate.Add(time.Hour), 872 }, 873 { 874 // chunk just for second store 875 secondStoreDate.Add(2 * time.Hour), 876 secondStoreDate.Add(3 * time.Hour), 877 }, 878 } 879 880 // build and add chunks to the store 881 addedChunkIDs := map[string]struct{}{} 882 for _, tr := range chunksToBuildForTimeRanges { 883 chk := newChunk(buildTestStreams(fooLabelsWithName, tr)) 884 885 err := store.PutOne(ctx, chk.From, chk.Through, chk) 886 require.NoError(t, err) 887 888 addedChunkIDs[chk.ExternalKey()] = struct{}{} 889 } 890 891 // recreate the store because boltdb-shipper now runs queriers on snapshots which are created every 1 min and during startup. 892 store.Stop() 893 894 chunkStore, err = storage.NewStore( 895 config.Config, 896 chunk.StoreConfig{}, 897 schemaConfig.SchemaConfig, 898 limits, 899 nil, 900 nil, 901 util_log.Logger, 902 ) 903 require.NoError(t, err) 904 905 store, err = NewStore(config, schemaConfig, chunkStore, nil) 906 require.NoError(t, err) 907 908 defer store.Stop() 909 910 // get all the chunks from both the stores 911 chunks, err := store.Get(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName)...) 912 require.NoError(t, err) 913 914 // we get common chunk twice because it is indexed in both the stores 915 require.Len(t, chunks, len(addedChunkIDs)+1) 916 917 // check whether we got back all the chunks which were added 918 for i := range chunks { 919 _, ok := addedChunkIDs[chunks[i].ExternalKey()] 920 require.True(t, ok) 921 } 922} 923 924func mustParseLabels(s string) map[string]string { 925 l, err := marshal.NewLabelSet(s) 926 if err != nil { 927 log.Fatalf("Failed to parse %s", s) 928 } 929 930 return l 931} 932 933func parseDate(in string) time.Time { 934 t, err := time.Parse("2006-01-02", in) 935 if err != nil { 936 panic(err) 937 } 938 return t 939} 940 941func buildTestStreams(labels string, tr timeRange) logproto.Stream { 942 stream := logproto.Stream{ 943 Labels: labels, 944 Entries: []logproto.Entry{}, 945 } 946 947 for from := tr.from; from.Before(tr.to); from = from.Add(time.Second) { 948 stream.Entries = append(stream.Entries, logproto.Entry{ 949 Timestamp: from, 950 Line: from.String(), 951 }) 952 } 953 954 return stream 955} 956 957func timeToModelTime(t time.Time) model.Time { 958 return model.TimeFromUnixNano(t.UnixNano()) 959} 960 961func TestActiveIndexType(t *testing.T) { 962 var cfg SchemaConfig 963 964 // just one PeriodConfig in the past 965 cfg.Configs = []chunk.PeriodConfig{{ 966 From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)}, 967 IndexType: "first", 968 }} 969 970 assert.Equal(t, 0, ActivePeriodConfig(cfg.Configs)) 971 972 // add a newer PeriodConfig in the past which should be considered 973 cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{ 974 From: chunk.DayTime{Time: model.Now().Add(-12 * time.Hour)}, 975 IndexType: "second", 976 }) 977 assert.Equal(t, 1, ActivePeriodConfig(cfg.Configs)) 978 979 // add a newer PeriodConfig in the future which should not be considered 980 cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{ 981 From: chunk.DayTime{Time: model.Now().Add(time.Hour)}, 982 IndexType: "third", 983 }) 984 assert.Equal(t, 1, ActivePeriodConfig(cfg.Configs)) 985} 986 987func TestUsingBoltdbShipper(t *testing.T) { 988 var cfg SchemaConfig 989 990 // just one PeriodConfig in the past using boltdb-shipper 991 cfg.Configs = []chunk.PeriodConfig{{ 992 From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)}, 993 IndexType: "boltdb-shipper", 994 }} 995 assert.Equal(t, true, UsingBoltdbShipper(cfg.Configs)) 996 997 // just one PeriodConfig in the past not using boltdb-shipper 998 cfg.Configs[0].IndexType = "boltdb" 999 assert.Equal(t, false, UsingBoltdbShipper(cfg.Configs)) 1000 1001 // add a newer PeriodConfig in the future using boltdb-shipper 1002 cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{ 1003 From: chunk.DayTime{Time: model.Now().Add(time.Hour)}, 1004 IndexType: "boltdb-shipper", 1005 }) 1006 assert.Equal(t, true, UsingBoltdbShipper(cfg.Configs)) 1007} 1008 1009func TestSchemaConfig_Validate(t *testing.T) { 1010 for _, tc := range []struct { 1011 name string 1012 configs []chunk.PeriodConfig 1013 err error 1014 }{ 1015 { 1016 name: "empty", 1017 configs: []chunk.PeriodConfig{}, 1018 err: errZeroLengthConfig, 1019 }, 1020 { 1021 name: "NOT using boltdb-shipper", 1022 configs: []chunk.PeriodConfig{{ 1023 From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)}, 1024 IndexType: "boltdb", 1025 Schema: "v9", 1026 IndexTables: chunk.PeriodicTableConfig{ 1027 Period: 7 * 24 * time.Hour, 1028 }, 1029 }}, 1030 }, 1031 { 1032 name: "current config boltdb-shipper with 7 days periodic config, without future index type changes", 1033 configs: []chunk.PeriodConfig{{ 1034 From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)}, 1035 IndexType: "boltdb-shipper", 1036 Schema: "v9", 1037 IndexTables: chunk.PeriodicTableConfig{ 1038 Period: 7 * 24 * time.Hour, 1039 }, 1040 }}, 1041 err: errCurrentBoltdbShipperNon24Hours, 1042 }, 1043 { 1044 name: "current config boltdb-shipper with 1 day periodic config, without future index type changes", 1045 configs: []chunk.PeriodConfig{{ 1046 From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)}, 1047 IndexType: "boltdb-shipper", 1048 Schema: "v9", 1049 IndexTables: chunk.PeriodicTableConfig{ 1050 Period: 24 * time.Hour, 1051 }, 1052 }}, 1053 }, 1054 { 1055 name: "current config boltdb-shipper with 7 days periodic config, upcoming config NOT boltdb-shipper", 1056 configs: []chunk.PeriodConfig{{ 1057 From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)}, 1058 IndexType: "boltdb-shipper", 1059 Schema: "v9", 1060 IndexTables: chunk.PeriodicTableConfig{ 1061 Period: 24 * time.Hour, 1062 }, 1063 }, { 1064 From: chunk.DayTime{Time: model.Now().Add(time.Hour)}, 1065 IndexType: "boltdb", 1066 Schema: "v9", 1067 IndexTables: chunk.PeriodicTableConfig{ 1068 Period: 7 * 24 * time.Hour, 1069 }, 1070 }}, 1071 }, 1072 { 1073 name: "current and upcoming config boltdb-shipper with 7 days periodic config", 1074 configs: []chunk.PeriodConfig{{ 1075 From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)}, 1076 IndexType: "boltdb-shipper", 1077 Schema: "v9", 1078 IndexTables: chunk.PeriodicTableConfig{ 1079 Period: 24 * time.Hour, 1080 }, 1081 }, { 1082 From: chunk.DayTime{Time: model.Now().Add(time.Hour)}, 1083 IndexType: "boltdb-shipper", 1084 Schema: "v9", 1085 IndexTables: chunk.PeriodicTableConfig{ 1086 Period: 7 * 24 * time.Hour, 1087 }, 1088 }}, 1089 err: errUpcomingBoltdbShipperNon24Hours, 1090 }, 1091 { 1092 name: "current config NOT boltdb-shipper, upcoming config boltdb-shipper with 7 days periodic config", 1093 configs: []chunk.PeriodConfig{{ 1094 From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)}, 1095 IndexType: "boltdb", 1096 Schema: "v9", 1097 IndexTables: chunk.PeriodicTableConfig{ 1098 Period: 24 * time.Hour, 1099 }, 1100 }, { 1101 From: chunk.DayTime{Time: model.Now().Add(time.Hour)}, 1102 IndexType: "boltdb-shipper", 1103 Schema: "v9", 1104 IndexTables: chunk.PeriodicTableConfig{ 1105 Period: 7 * 24 * time.Hour, 1106 }, 1107 }}, 1108 err: errUpcomingBoltdbShipperNon24Hours, 1109 }, 1110 } { 1111 t.Run(tc.name, func(t *testing.T) { 1112 cfg := SchemaConfig{chunk.SchemaConfig{Configs: tc.configs}} 1113 err := cfg.Validate() 1114 if tc.err == nil { 1115 require.NoError(t, err) 1116 } else { 1117 require.EqualError(t, err, tc.err.Error()) 1118 } 1119 }) 1120 } 1121} 1122 1123func Test_OverlappingChunks(t *testing.T) { 1124 chunks := []chunk.Chunk{ 1125 1126 newChunk(logproto.Stream{ 1127 Labels: `{foo="bar"}`, 1128 Entries: []logproto.Entry{ 1129 {Timestamp: time.Unix(0, 1), Line: "1"}, 1130 {Timestamp: time.Unix(0, 4), Line: "4"}, 1131 }, 1132 }), 1133 newChunk(logproto.Stream{ 1134 Labels: `{foo="bar"}`, 1135 Entries: []logproto.Entry{ 1136 {Timestamp: time.Unix(0, 2), Line: "2"}, 1137 {Timestamp: time.Unix(0, 3), Line: "3"}, 1138 }, 1139 }), 1140 } 1141 s := &store{ 1142 Store: &mockChunkStore{chunks: chunks, client: &mockChunkStoreClient{chunks: chunks}}, 1143 cfg: Config{ 1144 MaxChunkBatchSize: 10, 1145 }, 1146 chunkMetrics: NilMetrics, 1147 } 1148 1149 ctx = user.InjectOrgID(context.Background(), "test-user") 1150 it, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: &logproto.QueryRequest{ 1151 Selector: `{foo="bar"}`, 1152 Limit: 1000, 1153 Direction: logproto.BACKWARD, 1154 Start: time.Unix(0, 0), 1155 End: time.Unix(0, 10), 1156 }}) 1157 if err != nil { 1158 t.Errorf("store.SelectLogs() error = %v", err) 1159 return 1160 } 1161 defer it.Close() 1162 require.True(t, it.Next()) 1163 require.Equal(t, "4", it.Entry().Line) 1164 require.True(t, it.Next()) 1165 require.Equal(t, "3", it.Entry().Line) 1166 require.True(t, it.Next()) 1167 require.Equal(t, "2", it.Entry().Line) 1168 require.True(t, it.Next()) 1169 require.Equal(t, "1", it.Entry().Line) 1170 require.False(t, it.Next()) 1171} 1172