1package tsdb_test 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io/ioutil" 9 "math" 10 "math/rand" 11 "os" 12 "path/filepath" 13 "reflect" 14 "regexp" 15 "sort" 16 "strings" 17 "sync" 18 "testing" 19 "time" 20 21 "github.com/davecgh/go-spew/spew" 22 "github.com/influxdata/influxdb/internal" 23 "github.com/influxdata/influxdb/logger" 24 "github.com/influxdata/influxdb/models" 25 "github.com/influxdata/influxdb/pkg/deep" 26 "github.com/influxdata/influxdb/pkg/slices" 27 "github.com/influxdata/influxdb/query" 28 "github.com/influxdata/influxdb/tsdb" 29 "github.com/influxdata/influxdb/tsdb/index/inmem" 30 "github.com/influxdata/influxql" 31) 32 33// Ensure the store can delete a retention policy and all shards under 34// it. 35func TestStore_DeleteRetentionPolicy(t *testing.T) { 36 t.Parallel() 37 38 test := func(index string) { 39 s := MustOpenStore(index) 40 defer s.Close() 41 42 // Create a new shard and verify that it exists. 43 if err := s.CreateShard("db0", "rp0", 1, true); err != nil { 44 t.Fatal(err) 45 } else if sh := s.Shard(1); sh == nil { 46 t.Fatalf("expected shard") 47 } 48 49 // Create a new shard under the same retention policy, and verify 50 // that it exists. 51 if err := s.CreateShard("db0", "rp0", 2, true); err != nil { 52 t.Fatal(err) 53 } else if sh := s.Shard(2); sh == nil { 54 t.Fatalf("expected shard") 55 } 56 57 // Create a new shard under a different retention policy, and 58 // verify that it exists. 59 if err := s.CreateShard("db0", "rp1", 3, true); err != nil { 60 t.Fatal(err) 61 } else if sh := s.Shard(3); sh == nil { 62 t.Fatalf("expected shard") 63 } 64 65 // Deleting the rp0 retention policy does not return an error. 66 if err := s.DeleteRetentionPolicy("db0", "rp0"); err != nil { 67 t.Fatal(err) 68 } 69 70 // It deletes the shards under that retention policy. 71 if sh := s.Shard(1); sh != nil { 72 t.Errorf("shard 1 was not deleted") 73 } 74 75 if sh := s.Shard(2); sh != nil { 76 t.Errorf("shard 2 was not deleted") 77 } 78 79 // It deletes the retention policy directory. 80 if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp0")), false; got != exp { 81 t.Error("directory exists, but should have been removed") 82 } 83 84 // It deletes the WAL retention policy directory. 85 if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp0")), false; got != exp { 86 t.Error("directory exists, but should have been removed") 87 } 88 89 // Reopen other shard and check it still exists. 90 if err := s.Reopen(); err != nil { 91 t.Error(err) 92 } else if sh := s.Shard(3); sh == nil { 93 t.Errorf("shard 3 does not exist") 94 } 95 96 // It does not delete other retention policy directories. 97 if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp1")), true; got != exp { 98 t.Error("directory does not exist, but should") 99 } 100 if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp1")), true; got != exp { 101 t.Error("directory does not exist, but should") 102 } 103 } 104 105 for _, index := range tsdb.RegisteredIndexes() { 106 t.Run(index, func(t *testing.T) { test(index) }) 107 } 108} 109 110// Ensure the store can create a new shard. 111func TestStore_CreateShard(t *testing.T) { 112 t.Parallel() 113 114 test := func(index string) { 115 s := MustOpenStore(index) 116 defer s.Close() 117 118 // Create a new shard and verify that it exists. 119 if err := s.CreateShard("db0", "rp0", 1, true); err != nil { 120 t.Fatal(err) 121 } else if sh := s.Shard(1); sh == nil { 122 t.Fatalf("expected shard") 123 } 124 125 // Create another shard and verify that it exists. 126 if err := s.CreateShard("db0", "rp0", 2, true); err != nil { 127 t.Fatal(err) 128 } else if sh := s.Shard(2); sh == nil { 129 t.Fatalf("expected shard") 130 } 131 132 // Reopen shard and recheck. 133 if err := s.Reopen(); err != nil { 134 t.Fatal(err) 135 } else if sh := s.Shard(1); sh == nil { 136 t.Fatalf("expected shard(1)") 137 } else if sh = s.Shard(2); sh == nil { 138 t.Fatalf("expected shard(2)") 139 } 140 } 141 142 for _, index := range tsdb.RegisteredIndexes() { 143 t.Run(index, func(t *testing.T) { test(index) }) 144 } 145} 146 147func TestStore_CreateMixedShards(t *testing.T) { 148 t.Parallel() 149 150 test := func(index1 string, index2 string) { 151 s := MustOpenStore(index1) 152 defer s.Close() 153 154 // Create a new shard and verify that it exists. 155 if err := s.CreateShard("db0", "rp0", 1, true); err != nil { 156 t.Fatal(err) 157 } else if sh := s.Shard(1); sh == nil { 158 t.Fatalf("expected shard") 159 } 160 161 s.EngineOptions.IndexVersion = index2 162 s.index = index2 163 if err := s.Reopen(); err != nil { 164 t.Fatal(err) 165 } 166 167 // Create another shard and verify that it exists. 168 if err := s.CreateShard("db0", "rp0", 2, true); err != nil { 169 t.Fatal(err) 170 } else if sh := s.Shard(2); sh == nil { 171 t.Fatalf("expected shard") 172 } 173 174 // Reopen shard and recheck. 175 if err := s.Reopen(); err != nil { 176 t.Fatal(err) 177 } else if sh := s.Shard(1); sh == nil { 178 t.Fatalf("expected shard(1)") 179 } else if sh = s.Shard(2); sh == nil { 180 t.Fatalf("expected shard(2)") 181 } 182 183 sh := s.Shard(1) 184 if sh.IndexType() != index1 { 185 t.Fatalf("got index %v, expected %v", sh.IndexType(), index1) 186 } 187 188 sh = s.Shard(2) 189 if sh.IndexType() != index2 { 190 t.Fatalf("got index %v, expected %v", sh.IndexType(), index2) 191 } 192 } 193 194 indexes := tsdb.RegisteredIndexes() 195 for i := range indexes { 196 j := (i + 1) % len(indexes) 197 index1 := indexes[i] 198 index2 := indexes[j] 199 t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(index1, index2) }) 200 } 201} 202 203func TestStore_DropMeasurementMixedShards(t *testing.T) { 204 t.Parallel() 205 206 test := func(index1 string, index2 string) { 207 s := MustOpenStore(index1) 208 defer s.Close() 209 210 if err := s.CreateShard("db0", "rp0", 1, true); err != nil { 211 t.Fatal(err) 212 } 213 214 s.MustWriteToShardString(1, "mem,server=a v=1 10") 215 216 s.EngineOptions.IndexVersion = index2 217 s.index = index2 218 if err := s.Reopen(); err != nil { 219 t.Fatal(err) 220 } 221 222 if err := s.CreateShard("db0", "rp0", 2, true); err != nil { 223 t.Fatal(err) 224 } 225 226 s.MustWriteToShardString(2, "mem,server=b v=1 20") 227 228 s.MustWriteToShardString(1, "cpu,server=a v=1 10") 229 s.MustWriteToShardString(2, "cpu,server=b v=1 20") 230 231 err := s.DeleteMeasurement("db0", "cpu") 232 if err != tsdb.ErrMultipleIndexTypes { 233 t.Fatal(err) 234 } else if err == nil { 235 t.Fatal("expect failure deleting measurement on multiple index types") 236 } 237 } 238 239 indexes := tsdb.RegisteredIndexes() 240 for i := range indexes { 241 j := (i + 1) % len(indexes) 242 index1 := indexes[i] 243 index2 := indexes[j] 244 t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(index1, index2) }) 245 } 246} 247 248func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) { 249 t.Parallel() 250 251 test := func(index string) { 252 s := MustOpenStore(index) 253 defer s.Close() 254 255 if err := s.CreateShard("db0", "rp0", 1, true); err != nil { 256 t.Fatal(err) 257 } 258 259 s.MustWriteToShardString(1, "mem,server=a v=1 10") 260 261 if err := s.CreateShard("db0", "rp0", 2, true); err != nil { 262 t.Fatal(err) 263 } 264 265 s.MustWriteToShardString(2, "mem,server=b v=1 20") 266 267 errCh := make(chan error) 268 go func() { 269 for i := 0; i < 50; i++ { 270 s.MustWriteToShardString(1, "cpu,server=a v=1 10") 271 s.MustWriteToShardString(2, "cpu,server=b v=1 20") 272 } 273 errCh <- nil 274 }() 275 276 go func() { 277 for i := 0; i < 50; i++ { 278 if err := s.DeleteMeasurement("db0", "cpu"); err != nil { 279 errCh <- err 280 return 281 } 282 } 283 errCh <- nil 284 }() 285 286 for i := 0; i < 2; i++ { 287 if err := <-errCh; err != nil { 288 t.Fatal(err) 289 } 290 } 291 err := s.DeleteMeasurement("db0", "cpu") 292 if err != nil { 293 t.Fatal(err) 294 } 295 296 measurements, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil) 297 if err != nil { 298 t.Fatal(err) 299 } 300 301 exp := [][]byte{[]byte("mem")} 302 if got, exp := measurements, exp; !reflect.DeepEqual(got, exp) { 303 t.Fatal(fmt.Errorf("got measurements %v, expected %v", got, exp)) 304 } 305 } 306 307 for _, index := range tsdb.RegisteredIndexes() { 308 t.Run(index, func(t *testing.T) { test(index) }) 309 } 310} 311 312func TestStore_WriteMixedShards(t *testing.T) { 313 t.Parallel() 314 315 test := func(index1 string, index2 string) { 316 s := MustOpenStore(index1) 317 defer s.Close() 318 319 if err := s.CreateShard("db0", "rp0", 1, true); err != nil { 320 t.Fatal(err) 321 } 322 323 s.MustWriteToShardString(1, "mem,server=a v=1 10") 324 325 s.EngineOptions.IndexVersion = index2 326 s.index = index2 327 if err := s.Reopen(); err != nil { 328 t.Fatal(err) 329 } 330 331 if err := s.CreateShard("db0", "rp0", 2, true); err != nil { 332 t.Fatal(err) 333 } 334 335 s.MustWriteToShardString(2, "mem,server=b v=1 20") 336 337 var wg sync.WaitGroup 338 wg.Add(2) 339 340 go func() { 341 defer wg.Done() 342 for i := 0; i < 50; i++ { 343 s.MustWriteToShardString(1, fmt.Sprintf("cpu,server=a,f%0.2d=a v=1", i*2)) 344 } 345 }() 346 347 go func() { 348 defer wg.Done() 349 for i := 0; i < 50; i++ { 350 s.MustWriteToShardString(2, fmt.Sprintf("cpu,server=b,f%0.2d=b v=1 20", i*2+1)) 351 } 352 }() 353 354 wg.Wait() 355 356 keys, err := s.TagKeys(context.Background(), nil, []uint64{1, 2}, nil) 357 if err != nil { 358 t.Fatal(err) 359 } 360 361 cpuKeys := make([]string, 101) 362 for i := 0; i < 100; i++ { 363 cpuKeys[i] = fmt.Sprintf("f%0.2d", i) 364 } 365 cpuKeys[100] = "server" 366 expKeys := []tsdb.TagKeys{ 367 {Measurement: "cpu", Keys: cpuKeys}, 368 {Measurement: "mem", Keys: []string{"server"}}, 369 } 370 if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) { 371 t.Fatalf("got keys %v, expected %v", got, exp) 372 } 373 } 374 375 indexes := tsdb.RegisteredIndexes() 376 for i := range indexes { 377 j := (i + 1) % len(indexes) 378 index1 := indexes[i] 379 index2 := indexes[j] 380 t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(index1, index2) }) 381 } 382} 383 384// Ensure the store does not return an error when delete from a non-existent db. 385func TestStore_DeleteSeries_NonExistentDB(t *testing.T) { 386 t.Parallel() 387 388 test := func(index string) { 389 s := MustOpenStore(index) 390 defer s.Close() 391 392 if err := s.DeleteSeries("db0", nil, nil); err != nil { 393 t.Fatal(err.Error()) 394 } 395 } 396 397 for _, index := range tsdb.RegisteredIndexes() { 398 t.Run(index, func(t *testing.T) { test(index) }) 399 } 400} 401 402// Ensure the store can delete an existing shard. 403func TestStore_DeleteShard(t *testing.T) { 404 t.Parallel() 405 406 test := func(index string) error { 407 s := MustOpenStore(index) 408 defer s.Close() 409 410 // Create a new shard and verify that it exists. 411 if err := s.CreateShard("db0", "rp0", 1, true); err != nil { 412 return err 413 } else if sh := s.Shard(1); sh == nil { 414 return fmt.Errorf("expected shard") 415 } 416 417 // Create another shard. 418 if err := s.CreateShard("db0", "rp0", 2, true); err != nil { 419 return err 420 } else if sh := s.Shard(2); sh == nil { 421 return fmt.Errorf("expected shard") 422 } 423 424 // and another, but in a different db. 425 if err := s.CreateShard("db1", "rp0", 3, true); err != nil { 426 return err 427 } else if sh := s.Shard(3); sh == nil { 428 return fmt.Errorf("expected shard") 429 } 430 431 // Write series data to the db0 shards. 432 s.MustWriteToShardString(1, "cpu,servera=a v=1", "cpu,serverb=b v=1", "mem,serverc=a v=1") 433 s.MustWriteToShardString(2, "cpu,servera=a v=1", "mem,serverc=a v=1") 434 435 // Write similar data to db1 database 436 s.MustWriteToShardString(3, "cpu,serverb=b v=1") 437 438 // Reopen the store and check all shards still exist 439 if err := s.Reopen(); err != nil { 440 return err 441 } 442 for i := uint64(1); i <= 3; i++ { 443 if sh := s.Shard(i); sh == nil { 444 return fmt.Errorf("shard %d missing", i) 445 } 446 } 447 448 // Remove the first shard from the store. 449 if err := s.DeleteShard(1); err != nil { 450 return err 451 } 452 453 // cpu,serverb=b should be removed from the series file for db0 because 454 // shard 1 was the only owner of that series. 455 // Verify by getting all tag keys. 456 keys, err := s.TagKeys(context.Background(), nil, []uint64{2}, nil) 457 if err != nil { 458 return err 459 } 460 461 expKeys := []tsdb.TagKeys{ 462 {Measurement: "cpu", Keys: []string{"servera"}}, 463 {Measurement: "mem", Keys: []string{"serverc"}}, 464 } 465 if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) { 466 return fmt.Errorf("got keys %v, expected %v", got, exp) 467 } 468 469 // Verify that the same series was not removed from other databases' 470 // series files. 471 if keys, err = s.TagKeys(context.Background(), nil, []uint64{3}, nil); err != nil { 472 return err 473 } 474 475 expKeys = []tsdb.TagKeys{{Measurement: "cpu", Keys: []string{"serverb"}}} 476 if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) { 477 return fmt.Errorf("got keys %v, expected %v", got, exp) 478 } 479 return nil 480 } 481 482 for _, index := range tsdb.RegisteredIndexes() { 483 t.Run(index, func(t *testing.T) { 484 if err := test(index); err != nil { 485 t.Error(err) 486 } 487 }) 488 } 489} 490 491// Ensure the store can create a snapshot to a shard. 492func TestStore_CreateShardSnapShot(t *testing.T) { 493 t.Parallel() 494 495 test := func(index string) { 496 s := MustOpenStore(index) 497 defer s.Close() 498 499 // Create a new shard and verify that it exists. 500 if err := s.CreateShard("db0", "rp0", 1, true); err != nil { 501 t.Fatal(err) 502 } else if sh := s.Shard(1); sh == nil { 503 t.Fatalf("expected shard") 504 } 505 506 dir, e := s.CreateShardSnapshot(1, false) 507 if e != nil { 508 t.Fatal(e) 509 } 510 if dir == "" { 511 t.Fatal("empty directory name") 512 } 513 } 514 515 for _, index := range tsdb.RegisteredIndexes() { 516 t.Run(index, func(t *testing.T) { test(index) }) 517 } 518} 519 520func TestStore_Open(t *testing.T) { 521 t.Parallel() 522 523 test := func(index string) { 524 s := NewStore(index) 525 defer s.Close() 526 527 if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil { 528 t.Fatal(err) 529 } 530 531 if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp2", "4"), 0777); err != nil { 532 t.Fatal(err) 533 } 534 535 if err := os.MkdirAll(filepath.Join(s.Path(), "db1", "rp0", "1"), 0777); err != nil { 536 t.Fatal(err) 537 } 538 539 // Store should ignore shard since it does not have a numeric name. 540 if err := s.Open(); err != nil { 541 t.Fatal(err) 542 } else if n := len(s.Databases()); n != 2 { 543 t.Fatalf("unexpected database index count: %d", n) 544 } else if n := s.ShardN(); n != 3 { 545 t.Fatalf("unexpected shard count: %d", n) 546 } 547 548 expDatabases := []string{"db0", "db1"} 549 gotDatabases := s.Databases() 550 sort.Strings(gotDatabases) 551 552 if got, exp := gotDatabases, expDatabases; !reflect.DeepEqual(got, exp) { 553 t.Fatalf("got %#v, expected %#v", got, exp) 554 } 555 } 556 557 for _, index := range tsdb.RegisteredIndexes() { 558 t.Run(index, func(t *testing.T) { test(index) }) 559 } 560} 561 562// Ensure the store reports an error when it can't open a database directory. 563func TestStore_Open_InvalidDatabaseFile(t *testing.T) { 564 t.Parallel() 565 566 test := func(index string) { 567 s := NewStore(index) 568 defer s.Close() 569 570 // Create a file instead of a directory for a database. 571 if _, err := os.Create(filepath.Join(s.Path(), "db0")); err != nil { 572 t.Fatal(err) 573 } 574 575 // Store should ignore database since it's a file. 576 if err := s.Open(); err != nil { 577 t.Fatal(err) 578 } else if n := len(s.Databases()); n != 0 { 579 t.Fatalf("unexpected database index count: %d", n) 580 } 581 } 582 583 for _, index := range tsdb.RegisteredIndexes() { 584 t.Run(index, func(t *testing.T) { test(index) }) 585 } 586} 587 588// Ensure the store reports an error when it can't open a retention policy. 589func TestStore_Open_InvalidRetentionPolicy(t *testing.T) { 590 t.Parallel() 591 592 test := func(index string) { 593 s := NewStore(index) 594 defer s.Close() 595 596 // Create an RP file instead of a directory. 597 if err := os.MkdirAll(filepath.Join(s.Path(), "db0"), 0777); err != nil { 598 t.Fatal(err) 599 } else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0")); err != nil { 600 t.Fatal(err) 601 } 602 603 // Store should ignore retention policy since it's a file, and there should 604 // be no indices created. 605 if err := s.Open(); err != nil { 606 t.Fatal(err) 607 } else if n := len(s.Databases()); n != 0 { 608 t.Log(s.Databases()) 609 t.Fatalf("unexpected database index count: %d", n) 610 } 611 } 612 613 for _, index := range tsdb.RegisteredIndexes() { 614 t.Run(index, func(t *testing.T) { test(index) }) 615 } 616} 617 618// Ensure the store reports an error when it can't open a retention policy. 619func TestStore_Open_InvalidShard(t *testing.T) { 620 t.Parallel() 621 622 test := func(index string) { 623 s := NewStore(index) 624 defer s.Close() 625 626 // Create a non-numeric shard file. 627 if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0"), 0777); err != nil { 628 t.Fatal(err) 629 } else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0", "bad_shard")); err != nil { 630 t.Fatal(err) 631 } 632 633 // Store should ignore shard since it does not have a numeric name. 634 if err := s.Open(); err != nil { 635 t.Fatal(err) 636 } else if n := len(s.Databases()); n != 0 { 637 t.Fatalf("unexpected database index count: %d", n) 638 } else if n := s.ShardN(); n != 0 { 639 t.Fatalf("unexpected shard count: %d", n) 640 } 641 } 642 643 for _, index := range tsdb.RegisteredIndexes() { 644 t.Run(index, func(t *testing.T) { test(index) }) 645 } 646} 647 648// Ensure shards can create iterators. 649func TestShards_CreateIterator(t *testing.T) { 650 t.Parallel() 651 652 test := func(index string) { 653 s := MustOpenStore(index) 654 defer s.Close() 655 656 // Create shard #0 with data. 657 s.MustCreateShardWithData("db0", "rp0", 0, 658 `cpu,host=serverA value=1 0`, 659 `cpu,host=serverA value=2 10`, 660 `cpu,host=serverB value=3 20`, 661 ) 662 663 // Create shard #1 with data. 664 s.MustCreateShardWithData("db0", "rp0", 1, 665 `cpu,host=serverA value=1 30`, 666 `mem,host=serverA value=2 40`, // skip: wrong source 667 `cpu,host=serverC value=3 60`, 668 ) 669 670 // Retrieve shard group. 671 shards := s.ShardGroup([]uint64{0, 1}) 672 673 // Create iterator. 674 m := &influxql.Measurement{Name: "cpu"} 675 itr, err := shards.CreateIterator(context.Background(), m, query.IteratorOptions{ 676 Expr: influxql.MustParseExpr(`value`), 677 Dimensions: []string{"host"}, 678 Ascending: true, 679 StartTime: influxql.MinTime, 680 EndTime: influxql.MaxTime, 681 }) 682 if err != nil { 683 t.Fatal(err) 684 } 685 defer itr.Close() 686 fitr := itr.(query.FloatIterator) 687 688 // Read values from iterator. The host=serverA points should come first. 689 if p, err := fitr.Next(); err != nil { 690 t.Fatalf("unexpected error(0): %s", err) 691 } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(0, 0).UnixNano(), Value: 1}) { 692 t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) 693 } 694 if p, err := fitr.Next(); err != nil { 695 t.Fatalf("unexpected error(1): %s", err) 696 } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(10, 0).UnixNano(), Value: 2}) { 697 t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) 698 } 699 if p, err := fitr.Next(); err != nil { 700 t.Fatalf("unexpected error(2): %s", err) 701 } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(30, 0).UnixNano(), Value: 1}) { 702 t.Fatalf("unexpected point(2): %s", spew.Sdump(p)) 703 } 704 705 // Next the host=serverB point. 706 if p, err := fitr.Next(); err != nil { 707 t.Fatalf("unexpected error(3): %s", err) 708 } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverB"), Time: time.Unix(20, 0).UnixNano(), Value: 3}) { 709 t.Fatalf("unexpected point(3): %s", spew.Sdump(p)) 710 } 711 712 // And finally the host=serverC point. 713 if p, err := fitr.Next(); err != nil { 714 t.Fatalf("unexpected error(4): %s", err) 715 } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverC"), Time: time.Unix(60, 0).UnixNano(), Value: 3}) { 716 t.Fatalf("unexpected point(4): %s", spew.Sdump(p)) 717 } 718 719 // Then an EOF should occur. 720 if p, err := fitr.Next(); err != nil { 721 t.Fatalf("expected eof, got error: %s", err) 722 } else if p != nil { 723 t.Fatalf("expected eof, got: %s", spew.Sdump(p)) 724 } 725 } 726 727 for _, index := range tsdb.RegisteredIndexes() { 728 t.Run(index, func(t *testing.T) { test(index) }) 729 } 730} 731 732// Ensure the store can backup a shard and another store can restore it. 733func TestStore_BackupRestoreShard(t *testing.T) { 734 test := func(index string) { 735 s0, s1 := MustOpenStore(index), MustOpenStore(index) 736 defer s0.Close() 737 defer s1.Close() 738 739 // Create shard with data. 740 s0.MustCreateShardWithData("db0", "rp0", 100, 741 `cpu value=1 0`, 742 `cpu value=2 10`, 743 `cpu value=3 20`, 744 ) 745 746 if err := s0.Reopen(); err != nil { 747 t.Fatal(err) 748 } 749 750 // Backup shard to a buffer. 751 var buf bytes.Buffer 752 if err := s0.BackupShard(100, time.Time{}, &buf); err != nil { 753 t.Fatal(err) 754 } 755 756 // Create the shard on the other store and restore from buffer. 757 if err := s1.CreateShard("db0", "rp0", 100, true); err != nil { 758 t.Fatal(err) 759 } 760 if err := s1.RestoreShard(100, &buf); err != nil { 761 t.Fatal(err) 762 } 763 764 // Read data from 765 m := &influxql.Measurement{Name: "cpu"} 766 itr, err := s0.Shard(100).CreateIterator(context.Background(), m, query.IteratorOptions{ 767 Expr: influxql.MustParseExpr(`value`), 768 Ascending: true, 769 StartTime: influxql.MinTime, 770 EndTime: influxql.MaxTime, 771 }) 772 if err != nil { 773 t.Fatal(err) 774 } 775 defer itr.Close() 776 fitr := itr.(query.FloatIterator) 777 778 // Read values from iterator. The host=serverA points should come first. 779 p, e := fitr.Next() 780 if e != nil { 781 t.Fatal(e) 782 } 783 if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(0, 0).UnixNano(), Value: 1}) { 784 t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) 785 } 786 p, e = fitr.Next() 787 if e != nil { 788 t.Fatal(e) 789 } 790 if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(10, 0).UnixNano(), Value: 2}) { 791 t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) 792 } 793 p, e = fitr.Next() 794 if e != nil { 795 t.Fatal(e) 796 } 797 if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(20, 0).UnixNano(), Value: 3}) { 798 t.Fatalf("unexpected point(2): %s", spew.Sdump(p)) 799 } 800 } 801 802 for _, index := range tsdb.RegisteredIndexes() { 803 if index == tsdb.TSI1IndexName { 804 t.Skip("Skipping failing test for tsi1") 805 } 806 807 t.Run(index, func(t *testing.T) { 808 test(index) 809 }) 810 } 811} 812func TestStore_Shard_SeriesN(t *testing.T) { 813 t.Parallel() 814 815 test := func(index string) error { 816 s := MustOpenStore(index) 817 defer s.Close() 818 819 // Create shard with data. 820 s.MustCreateShardWithData("db0", "rp0", 1, 821 `cpu value=1 0`, 822 `cpu,host=serverA value=2 10`, 823 ) 824 825 // Create 2nd shard w/ same measurements. 826 s.MustCreateShardWithData("db0", "rp0", 2, 827 `cpu value=1 0`, 828 `cpu value=2 10`, 829 ) 830 831 if got, exp := s.Shard(1).SeriesN(), int64(2); got != exp { 832 return fmt.Errorf("[shard %d] got series count of %d, but expected %d", 1, got, exp) 833 } else if got, exp := s.Shard(2).SeriesN(), int64(1); got != exp { 834 return fmt.Errorf("[shard %d] got series count of %d, but expected %d", 2, got, exp) 835 } 836 return nil 837 } 838 839 for _, index := range tsdb.RegisteredIndexes() { 840 t.Run(index, func(t *testing.T) { 841 if err := test(index); err != nil { 842 t.Error(err) 843 } 844 }) 845 } 846} 847 848func TestStore_MeasurementNames_Deduplicate(t *testing.T) { 849 t.Parallel() 850 851 test := func(index string) { 852 s := MustOpenStore(index) 853 defer s.Close() 854 855 // Create shard with data. 856 s.MustCreateShardWithData("db0", "rp0", 1, 857 `cpu value=1 0`, 858 `cpu value=2 10`, 859 `cpu value=3 20`, 860 ) 861 862 // Create 2nd shard w/ same measurements. 863 s.MustCreateShardWithData("db0", "rp0", 2, 864 `cpu value=1 0`, 865 `cpu value=2 10`, 866 `cpu value=3 20`, 867 ) 868 869 meas, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil) 870 if err != nil { 871 t.Fatalf("unexpected error with MeasurementNames: %v", err) 872 } 873 874 if exp, got := 1, len(meas); exp != got { 875 t.Fatalf("measurement len mismatch: exp %v, got %v", exp, got) 876 } 877 878 if exp, got := "cpu", string(meas[0]); exp != got { 879 t.Fatalf("measurement name mismatch: exp %v, got %v", exp, got) 880 } 881 } 882 883 for _, index := range tsdb.RegisteredIndexes() { 884 t.Run(index, func(t *testing.T) { test(index) }) 885 } 886} 887 888func testStoreCardinalityTombstoning(t *testing.T, store *Store) { 889 // Generate point data to write to the shards. 890 series := genTestSeries(10, 2, 4) // 160 series 891 892 points := make([]models.Point, 0, len(series)) 893 for _, s := range series { 894 points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) 895 } 896 897 // Create requested number of shards in the store & write points across 898 // shards such that we never write the same series to multiple shards. 899 for shardID := 0; shardID < 4; shardID++ { 900 if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { 901 t.Errorf("create shard: %s", err) 902 } 903 904 if err := store.BatchWrite(shardID, points[shardID*40:(shardID+1)*40]); err != nil { 905 t.Errorf("batch write: %s", err) 906 } 907 } 908 909 // Delete all the series for each measurement. 910 mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil) 911 if err != nil { 912 t.Fatal(err) 913 } 914 915 for _, name := range mnames { 916 if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil); err != nil { 917 t.Fatal(err) 918 } 919 } 920 921 // Estimate the series cardinality... 922 cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") 923 if err != nil { 924 t.Fatal(err) 925 } 926 927 // Estimated cardinality should be well within 10 of the actual cardinality. 928 if got, exp := int(cardinality), 10; got > exp { 929 t.Errorf("series cardinality was %v (expected within %v), expected was: %d", got, exp, 0) 930 } 931 932 // Since all the series have been deleted, all the measurements should have 933 // been removed from the index too. 934 if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { 935 t.Fatal(err) 936 } 937 938 // Estimated cardinality should be well within 2 of the actual cardinality. 939 // TODO(edd): this is totally arbitrary. How can I make it better? 940 if got, exp := int(cardinality), 2; got > exp { 941 t.Errorf("measurement cardinality was %v (expected within %v), expected was: %d", got, exp, 0) 942 } 943} 944 945func TestStore_Cardinality_Tombstoning(t *testing.T) { 946 t.Parallel() 947 948 if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" { 949 t.Skip("Skipping test in short, race and appveyor mode.") 950 } 951 952 test := func(index string) { 953 store := NewStore(index) 954 if err := store.Open(); err != nil { 955 panic(err) 956 } 957 defer store.Close() 958 testStoreCardinalityTombstoning(t, store) 959 } 960 961 for _, index := range tsdb.RegisteredIndexes() { 962 t.Run(index, func(t *testing.T) { test(index) }) 963 } 964} 965 966func testStoreCardinalityUnique(t *testing.T, store *Store) { 967 // Generate point data to write to the shards. 968 series := genTestSeries(64, 5, 5) // 200,000 series 969 expCardinality := len(series) 970 971 points := make([]models.Point, 0, len(series)) 972 for _, s := range series { 973 points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) 974 } 975 976 // Create requested number of shards in the store & write points across 977 // shards such that we never write the same series to multiple shards. 978 for shardID := 0; shardID < 10; shardID++ { 979 if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { 980 t.Fatalf("create shard: %s", err) 981 } 982 if err := store.BatchWrite(shardID, points[shardID*20000:(shardID+1)*20000]); err != nil { 983 t.Fatalf("batch write: %s", err) 984 } 985 } 986 987 // Estimate the series cardinality... 988 cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") 989 if err != nil { 990 t.Fatal(err) 991 } 992 993 // Estimated cardinality should be well within 1.5% of the actual cardinality. 994 if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp { 995 t.Errorf("got epsilon of %v for series cardinality %v (expected %v), which is larger than expected %v", got, cardinality, expCardinality, exp) 996 } 997 998 // Estimate the measurement cardinality... 999 if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { 1000 t.Fatal(err) 1001 } 1002 1003 // Estimated cardinality should be well within 2 of the actual cardinality. (arbitrary...) 1004 expCardinality = 64 1005 if got, exp := math.Abs(float64(cardinality)-float64(expCardinality)), 2.0; got > exp { 1006 t.Errorf("got measurmement cardinality %v, expected upto %v; difference is larger than expected %v", cardinality, expCardinality, exp) 1007 } 1008} 1009 1010func TestStore_Cardinality_Unique(t *testing.T) { 1011 t.Parallel() 1012 1013 if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" { 1014 t.Skip("Skipping test in short, race and appveyor mode.") 1015 } 1016 1017 test := func(index string) { 1018 store := NewStore(index) 1019 store.EngineOptions.Config.MaxSeriesPerDatabase = 0 1020 if err := store.Open(); err != nil { 1021 panic(err) 1022 } 1023 defer store.Close() 1024 testStoreCardinalityUnique(t, store) 1025 } 1026 1027 for _, index := range tsdb.RegisteredIndexes() { 1028 t.Run(index, func(t *testing.T) { test(index) }) 1029 } 1030} 1031 1032// This test tests cardinality estimation when series data is duplicated across 1033// multiple shards. 1034func testStoreCardinalityDuplicates(t *testing.T, store *Store) { 1035 // Generate point data to write to the shards. 1036 series := genTestSeries(64, 5, 5) // 200,000 series. 1037 expCardinality := len(series) 1038 1039 points := make([]models.Point, 0, len(series)) 1040 for _, s := range series { 1041 points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) 1042 } 1043 1044 // Create requested number of shards in the store & write points. 1045 for shardID := 0; shardID < 10; shardID++ { 1046 if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { 1047 t.Fatalf("create shard: %s", err) 1048 } 1049 1050 var from, to int 1051 if shardID == 0 { 1052 // if it's the first shard then write all of the points. 1053 from, to = 0, len(points)-1 1054 } else { 1055 // For other shards we write a random sub-section of all the points. 1056 // which will duplicate the series and shouldn't increase the 1057 // cardinality. 1058 from, to = rand.Intn(len(points)), rand.Intn(len(points)) 1059 if from > to { 1060 from, to = to, from 1061 } 1062 } 1063 1064 if err := store.BatchWrite(shardID, points[from:to]); err != nil { 1065 t.Fatalf("batch write: %s", err) 1066 } 1067 } 1068 1069 // Estimate the series cardinality... 1070 cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") 1071 if err != nil { 1072 t.Fatal(err) 1073 } 1074 1075 // Estimated cardinality should be well within 1.5% of the actual cardinality. 1076 if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp { 1077 t.Errorf("got epsilon of %v for series cardinality %d (expected %d), which is larger than expected %v", got, cardinality, expCardinality, exp) 1078 } 1079 1080 // Estimate the measurement cardinality... 1081 if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { 1082 t.Fatal(err) 1083 } 1084 1085 // Estimated cardinality should be well within 2 of the actual cardinality. (Arbitrary...) 1086 expCardinality = 64 1087 if got, exp := math.Abs(float64(cardinality)-float64(expCardinality)), 2.0; got > exp { 1088 t.Errorf("got measurement cardinality %v, expected upto %v; difference is larger than expected %v", cardinality, expCardinality, exp) 1089 } 1090} 1091 1092func TestStore_Cardinality_Duplicates(t *testing.T) { 1093 t.Parallel() 1094 1095 if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" { 1096 t.Skip("Skipping test in short, race and appveyor mode.") 1097 } 1098 1099 test := func(index string) { 1100 store := NewStore(index) 1101 store.EngineOptions.Config.MaxSeriesPerDatabase = 0 1102 if err := store.Open(); err != nil { 1103 panic(err) 1104 } 1105 defer store.Close() 1106 testStoreCardinalityDuplicates(t, store) 1107 } 1108 1109 for _, index := range tsdb.RegisteredIndexes() { 1110 t.Run(index, func(t *testing.T) { test(index) }) 1111 } 1112} 1113 1114func TestStore_MetaQuery_Timeout(t *testing.T) { 1115 if testing.Short() || os.Getenv("APPVEYOR") != "" { 1116 t.Skip("Skipping test in short and appveyor mode.") 1117 } 1118 1119 test := func(index string) { 1120 store := NewStore(index) 1121 store.EngineOptions.Config.MaxSeriesPerDatabase = 0 1122 if err := store.Open(); err != nil { 1123 panic(err) 1124 } 1125 defer store.Close() 1126 testStoreMetaQueryTimeout(t, store, index) 1127 } 1128 1129 for _, index := range tsdb.RegisteredIndexes() { 1130 test(index) 1131 } 1132} 1133 1134func testStoreMetaQueryTimeout(t *testing.T, store *Store, index string) { 1135 shards := testStoreMetaQuerySetup(t, store) 1136 1137 testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { 1138 const funcName = "SeriesCardinality" 1139 _, err := store.Store.SeriesCardinality(ctx, "db") 1140 return funcName, err 1141 }, index)(t) 1142 1143 testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { 1144 const funcName = "MeasurementsCardinality" 1145 _, err := store.Store.MeasurementsCardinality(ctx, "db") 1146 return funcName, err 1147 }, index)(t) 1148 1149 keyCondition, allCondition := testStoreMetaQueryCondition() 1150 1151 testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { 1152 const funcName = "TagValues" 1153 _, err := store.Store.TagValues(ctx, nil, shards, allCondition) 1154 return funcName, err 1155 }, index)(t) 1156 1157 testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { 1158 const funcName = "TagKeys" 1159 _, err := store.Store.TagKeys(ctx, nil, shards, keyCondition) 1160 return funcName, err 1161 }, index)(t) 1162 1163 testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) { 1164 const funcName = "MeasurementNames" 1165 _, err := store.Store.MeasurementNames(ctx, nil, "db", nil) 1166 return funcName, err 1167 }, index)(t) 1168} 1169 1170func testStoreMetaQueryCondition() (influxql.Expr, influxql.Expr) { 1171 keyCondition := &influxql.ParenExpr{ 1172 Expr: &influxql.BinaryExpr{ 1173 Op: influxql.OR, 1174 LHS: &influxql.BinaryExpr{ 1175 Op: influxql.EQ, 1176 LHS: &influxql.VarRef{Val: "_tagKey"}, 1177 RHS: &influxql.StringLiteral{Val: "tagKey4"}, 1178 }, 1179 RHS: &influxql.BinaryExpr{ 1180 Op: influxql.EQ, 1181 LHS: &influxql.VarRef{Val: "_tagKey"}, 1182 RHS: &influxql.StringLiteral{Val: "tagKey5"}, 1183 }, 1184 }, 1185 } 1186 1187 whereCondition := &influxql.ParenExpr{ 1188 Expr: &influxql.BinaryExpr{ 1189 Op: influxql.AND, 1190 LHS: &influxql.ParenExpr{ 1191 Expr: &influxql.BinaryExpr{ 1192 Op: influxql.EQ, 1193 LHS: &influxql.VarRef{Val: "tagKey1"}, 1194 RHS: &influxql.StringLiteral{Val: "tagValue2"}, 1195 }, 1196 }, 1197 RHS: keyCondition, 1198 }, 1199 } 1200 1201 allCondition := &influxql.BinaryExpr{ 1202 Op: influxql.AND, 1203 LHS: &influxql.ParenExpr{ 1204 Expr: &influxql.BinaryExpr{ 1205 Op: influxql.EQREGEX, 1206 LHS: &influxql.VarRef{Val: "tagKey3"}, 1207 RHS: &influxql.RegexLiteral{Val: regexp.MustCompile(`tagValue\d`)}, 1208 }, 1209 }, 1210 RHS: whereCondition, 1211 } 1212 return keyCondition, allCondition 1213} 1214 1215func testStoreMetaQuerySetup(t *testing.T, store *Store) []uint64 { 1216 const measurementCnt = 64 1217 const tagCnt = 5 1218 const valueCnt = 5 1219 const pointsPerShard = 20000 1220 1221 // Generate point data to write to the shards. 1222 series := genTestSeries(measurementCnt, tagCnt, valueCnt) 1223 1224 points := make([]models.Point, 0, len(series)) 1225 for _, s := range series { 1226 points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) 1227 } 1228 // Create requested number of shards in the store & write points across 1229 // shards such that we never write the same series to multiple shards. 1230 shards := make([]uint64, len(points)/pointsPerShard) 1231 for shardID := 0; shardID < len(points)/pointsPerShard; shardID++ { 1232 if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { 1233 t.Fatalf("create shard: %s", err) 1234 } 1235 if err := store.BatchWrite(shardID, points[shardID*pointsPerShard:(shardID+1)*pointsPerShard]); err != nil { 1236 t.Fatalf("batch write: %s", err) 1237 } 1238 shards[shardID] = uint64(shardID) 1239 } 1240 return shards 1241} 1242 1243func testStoreMakeTimedFuncs(tested func(context.Context) (string, error), index string) func(*testing.T) { 1244 cancelTested := func(t *testing.T) { 1245 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(0)) 1246 defer cancel() 1247 1248 funcName, err := tested(ctx) 1249 if err == nil { 1250 t.Fatalf("%v: failed to time out with index type %v", funcName, index) 1251 } else if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) { 1252 t.Fatalf("%v: failed with %v instead of %v with index type %v", funcName, err, context.DeadlineExceeded, index) 1253 } 1254 } 1255 return cancelTested 1256} 1257 1258// Creates a large number of series in multiple shards, which will force 1259// compactions to occur. 1260func testStoreCardinalityCompactions(store *Store) error { 1261 1262 // Generate point data to write to the shards. 1263 series := genTestSeries(300, 5, 5) // 937,500 series 1264 expCardinality := len(series) 1265 1266 points := make([]models.Point, 0, len(series)) 1267 for _, s := range series { 1268 points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) 1269 } 1270 1271 // Create requested number of shards in the store & write points across 1272 // shards such that we never write the same series to multiple shards. 1273 for shardID := 0; shardID < 2; shardID++ { 1274 if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { 1275 return fmt.Errorf("create shard: %s", err) 1276 } 1277 if err := store.BatchWrite(shardID, points[shardID*468750:(shardID+1)*468750]); err != nil { 1278 return fmt.Errorf("batch write: %s", err) 1279 } 1280 } 1281 1282 // Estimate the series cardinality... 1283 cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") 1284 if err != nil { 1285 return err 1286 } 1287 1288 // Estimated cardinality should be well within 1.5% of the actual cardinality. 1289 if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp { 1290 return fmt.Errorf("got epsilon of %v for series cardinality %v (expected %v), which is larger than expected %v", got, cardinality, expCardinality, exp) 1291 } 1292 1293 // Estimate the measurement cardinality... 1294 if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil { 1295 return err 1296 } 1297 1298 // Estimated cardinality should be well within 2 of the actual cardinality. (Arbitrary...) 1299 expCardinality = 300 1300 if got, exp := math.Abs(float64(cardinality)-float64(expCardinality)), 2.0; got > exp { 1301 return fmt.Errorf("got measurement cardinality %v, expected upto %v; difference is larger than expected %v", cardinality, expCardinality, exp) 1302 } 1303 return nil 1304} 1305 1306func TestStore_Cardinality_Compactions(t *testing.T) { 1307 if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" { 1308 t.Skip("Skipping test in short, race and appveyor mode.") 1309 } 1310 1311 test := func(index string) error { 1312 store := NewStore(index) 1313 store.EngineOptions.Config.MaxSeriesPerDatabase = 0 1314 if err := store.Open(); err != nil { 1315 panic(err) 1316 } 1317 defer store.Close() 1318 return testStoreCardinalityCompactions(store) 1319 } 1320 1321 for _, index := range tsdb.RegisteredIndexes() { 1322 t.Run(index, func(t *testing.T) { 1323 if err := test(index); err != nil { 1324 t.Fatal(err) 1325 } 1326 }) 1327 } 1328} 1329 1330func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) { 1331 t.Parallel() 1332 1333 if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" { 1334 t.Skip("Skipping test in short, race and appveyor mode.") 1335 } 1336 1337 store := NewStore("inmem") 1338 store.EngineOptions.Config.MaxSeriesPerDatabase = 100000 1339 if err := store.Open(); err != nil { 1340 panic(err) 1341 } 1342 defer store.Close() 1343 1344 // Generate 200,000 series to write. 1345 series := genTestSeries(64, 5, 5) 1346 1347 // Add 1 point to each series. 1348 points := make([]models.Point, 0, len(series)) 1349 for _, s := range series { 1350 points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) 1351 } 1352 1353 // Create shards to write points into. 1354 numShards := 10 1355 for shardID := 0; shardID < numShards; shardID++ { 1356 if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { 1357 t.Fatalf("create shard: %s", err) 1358 } 1359 } 1360 1361 // Write series / points to the shards. 1362 pointsPerShard := len(points) / numShards 1363 1364 for shardID := 0; shardID < numShards; shardID++ { 1365 from := shardID * pointsPerShard 1366 to := from + pointsPerShard 1367 1368 if err := store.Store.WriteToShard(tsdb.WriteContext{}, uint64(shardID), points[from:to]); err != nil { 1369 if !strings.Contains(err.Error(), "partial write: max-series-per-database limit exceeded:") { 1370 t.Fatal(err) 1371 } 1372 } 1373 } 1374 1375 // Get updated series cardinality from store after writing data. 1376 cardinality, err := store.Store.SeriesCardinality(context.Background(), "db") 1377 if err != nil { 1378 t.Fatal(err) 1379 } 1380 expCardinality := store.EngineOptions.Config.MaxSeriesPerDatabase 1381 1382 // Estimated cardinality should be well within 1.5% of the actual cardinality. 1383 got := math.Abs(float64(cardinality)-float64(expCardinality)) / float64(expCardinality) 1384 exp := 0.015 1385 if got > exp { 1386 t.Errorf("got epsilon of %v for series cardinality %d (expected %d), which is larger than expected %v", got, cardinality, expCardinality, exp) 1387 } 1388} 1389 1390func TestStore_Sketches(t *testing.T) { 1391 t.Parallel() 1392 1393 checkCardinalities := func(store *tsdb.Store, series, tseries, measurements, tmeasurements int) error { 1394 // Get sketches and check cardinality... 1395 sketch, tsketch, err := store.SeriesSketches(context.Background(), "db") 1396 if err != nil { 1397 return err 1398 } 1399 1400 // delta calculates a rough 10% delta. If i is small then a minimum value 1401 // of 2 is used. 1402 delta := func(i int) int { 1403 v := i / 10 1404 if v == 0 { 1405 v = 2 1406 } 1407 return v 1408 } 1409 1410 // series cardinality should be well within 10%. 1411 if got, exp := int(sketch.Count()), series; got-exp < -delta(series) || got-exp > delta(series) { 1412 return fmt.Errorf("got series cardinality %d, expected ~%d", got, exp) 1413 } 1414 1415 // check series tombstones 1416 if got, exp := int(tsketch.Count()), tseries; got-exp < -delta(tseries) || got-exp > delta(tseries) { 1417 return fmt.Errorf("got series tombstone cardinality %d, expected ~%d", got, exp) 1418 } 1419 1420 // Check measurement cardinality. 1421 if sketch, tsketch, err = store.MeasurementsSketches(context.Background(), "db"); err != nil { 1422 return err 1423 } 1424 1425 if got, exp := int(sketch.Count()), measurements; got-exp < -delta(measurements) || got-exp > delta(measurements) { 1426 return fmt.Errorf("got measurement cardinality %d, expected ~%d", got, exp) 1427 } 1428 1429 if got, exp := int(tsketch.Count()), tmeasurements; got-exp < -delta(tmeasurements) || got-exp > delta(tmeasurements) { 1430 return fmt.Errorf("got measurement tombstone cardinality %d, expected ~%d", got, exp) 1431 } 1432 return nil 1433 } 1434 1435 test := func(index string) error { 1436 store := MustOpenStore(index) 1437 defer store.Close() 1438 1439 // Generate point data to write to the shards. 1440 series := genTestSeries(10, 2, 4) // 160 series 1441 1442 points := make([]models.Point, 0, len(series)) 1443 for _, s := range series { 1444 points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now())) 1445 } 1446 1447 // Create requested number of shards in the store & write points across 1448 // shards such that we never write the same series to multiple shards. 1449 for shardID := 0; shardID < 4; shardID++ { 1450 if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { 1451 return fmt.Errorf("create shard: %s", err) 1452 } 1453 1454 if err := store.BatchWrite(shardID, points[shardID*40:(shardID+1)*40]); err != nil { 1455 return fmt.Errorf("batch write: %s", err) 1456 } 1457 } 1458 1459 // Check cardinalities 1460 if err := checkCardinalities(store.Store, 160, 0, 10, 0); err != nil { 1461 return fmt.Errorf("[initial] %v", err) 1462 } 1463 1464 // Reopen the store. 1465 if err := store.Reopen(); err != nil { 1466 return err 1467 } 1468 1469 // Check cardinalities 1470 if err := checkCardinalities(store.Store, 160, 0, 10, 0); err != nil { 1471 return fmt.Errorf("[initial|re-open] %v", err) 1472 } 1473 1474 // Delete half the the measurements data 1475 mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil) 1476 if err != nil { 1477 return err 1478 } 1479 1480 for _, name := range mnames[:len(mnames)/2] { 1481 if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil); err != nil { 1482 return err 1483 } 1484 } 1485 1486 // Check cardinalities. In this case, the indexes behave differently. 1487 expS, expTS, expM, expTM := 160, 80, 10, 5 1488 if index == inmem.IndexName { 1489 expS, expTS, expM, expTM = 160, 80, 10, 5 1490 } 1491 1492 // Check cardinalities - tombstones should be in 1493 if err := checkCardinalities(store.Store, expS, expTS, expM, expTM); err != nil { 1494 return fmt.Errorf("[initial|re-open|delete] %v", err) 1495 } 1496 1497 // Reopen the store. 1498 if err := store.Reopen(); err != nil { 1499 return err 1500 } 1501 1502 // Check cardinalities. In this case, the indexes behave differently. 1503 expS, expTS, expM, expTM = 80, 80, 5, 5 1504 if index == inmem.IndexName { 1505 expS, expTS, expM, expTM = 80, 0, 5, 0 1506 } 1507 1508 if err := checkCardinalities(store.Store, expS, expTS, expM, expTM); err != nil { 1509 return fmt.Errorf("[initial|re-open|delete|re-open] %v", err) 1510 } 1511 return nil 1512 } 1513 1514 for _, index := range tsdb.RegisteredIndexes() { 1515 t.Run(index, func(t *testing.T) { 1516 if err := test(index); err != nil { 1517 t.Fatal(err) 1518 } 1519 }) 1520 } 1521} 1522 1523func TestStore_TagValues(t *testing.T) { 1524 t.Parallel() 1525 1526 // No WHERE - just get for keys host and shard 1527 RHSAll := &influxql.ParenExpr{ 1528 Expr: &influxql.BinaryExpr{ 1529 Op: influxql.OR, 1530 LHS: &influxql.BinaryExpr{ 1531 Op: influxql.EQ, 1532 LHS: &influxql.VarRef{Val: "_tagKey"}, 1533 RHS: &influxql.StringLiteral{Val: "host"}, 1534 }, 1535 RHS: &influxql.BinaryExpr{ 1536 Op: influxql.EQ, 1537 LHS: &influxql.VarRef{Val: "_tagKey"}, 1538 RHS: &influxql.StringLiteral{Val: "shard"}, 1539 }, 1540 }, 1541 } 1542 1543 // Get for host and shard, but also WHERE on foo = a 1544 RHSWhere := &influxql.ParenExpr{ 1545 Expr: &influxql.BinaryExpr{ 1546 Op: influxql.AND, 1547 LHS: &influxql.ParenExpr{ 1548 Expr: &influxql.BinaryExpr{ 1549 Op: influxql.EQ, 1550 LHS: &influxql.VarRef{Val: "foo"}, 1551 RHS: &influxql.StringLiteral{Val: "a"}, 1552 }, 1553 }, 1554 RHS: RHSAll, 1555 }, 1556 } 1557 1558 // SHOW TAG VALUES FROM /cpu\d/ WITH KEY IN ("host", "shard") 1559 // 1560 // Switching out RHS for RHSWhere would make the query: 1561 // SHOW TAG VALUES FROM /cpu\d/ WITH KEY IN ("host", "shard") WHERE foo = 'a' 1562 base := influxql.BinaryExpr{ 1563 Op: influxql.AND, 1564 LHS: &influxql.ParenExpr{ 1565 Expr: &influxql.BinaryExpr{ 1566 Op: influxql.EQREGEX, 1567 LHS: &influxql.VarRef{Val: "_name"}, 1568 RHS: &influxql.RegexLiteral{Val: regexp.MustCompile(`cpu\d`)}, 1569 }, 1570 }, 1571 RHS: RHSAll, 1572 } 1573 1574 var baseWhere *influxql.BinaryExpr = influxql.CloneExpr(&base).(*influxql.BinaryExpr) 1575 baseWhere.RHS = RHSWhere 1576 1577 examples := []struct { 1578 Name string 1579 Expr influxql.Expr 1580 Exp []tsdb.TagValues 1581 }{ 1582 { 1583 Name: "No WHERE clause", 1584 Expr: &base, 1585 Exp: []tsdb.TagValues{ 1586 createTagValues("cpu0", map[string][]string{"shard": {"s0"}}), 1587 createTagValues("cpu1", map[string][]string{"shard": {"s1"}}), 1588 createTagValues("cpu10", map[string][]string{"host": {"nofoo", "tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}), 1589 createTagValues("cpu11", map[string][]string{"host": {"nofoo", "tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}), 1590 createTagValues("cpu12", map[string][]string{"host": {"nofoo", "tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}), 1591 createTagValues("cpu2", map[string][]string{"shard": {"s2"}}), 1592 }, 1593 }, 1594 { 1595 Name: "With WHERE clause", 1596 Expr: baseWhere, 1597 Exp: []tsdb.TagValues{ 1598 createTagValues("cpu0", map[string][]string{"shard": {"s0"}}), 1599 createTagValues("cpu1", map[string][]string{"shard": {"s1"}}), 1600 createTagValues("cpu10", map[string][]string{"host": {"tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}), 1601 createTagValues("cpu11", map[string][]string{"host": {"tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}), 1602 createTagValues("cpu12", map[string][]string{"host": {"tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}), 1603 createTagValues("cpu2", map[string][]string{"shard": {"s2"}}), 1604 }, 1605 }, 1606 } 1607 1608 setup := func(index string) (*Store, []uint64) { // returns shard ids 1609 s := MustOpenStore(index) 1610 1611 fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d 1612 cpu1%[1]d,host=nofoo value=1 %[4]d 1613 mem,host=nothanks value=1 %[4]d 1614 cpu%[3]d,shard=s%[3]d,foo=a value=2 %[4]d 1615 ` 1616 genPoints := func(sid int) []string { 1617 var ts int 1618 points := make([]string, 0, 3*4) 1619 for m := 0; m < 3; m++ { 1620 for tagvid := 0; tagvid < 4; tagvid++ { 1621 points = append(points, fmt.Sprintf(fmtStr, m, tagvid, sid, ts)) 1622 ts++ 1623 } 1624 } 1625 return points 1626 } 1627 1628 // Create data across 3 shards. 1629 var ids []uint64 1630 for i := 0; i < 3; i++ { 1631 ids = append(ids, uint64(i)) 1632 s.MustCreateShardWithData("db0", "rp0", i, genPoints(i)...) 1633 } 1634 return s, ids 1635 } 1636 1637 for _, example := range examples { 1638 for _, index := range tsdb.RegisteredIndexes() { 1639 s, shardIDs := setup(index) 1640 t.Run(example.Name+"_"+index, func(t *testing.T) { 1641 got, err := s.TagValues(context.Background(), nil, shardIDs, example.Expr) 1642 if err != nil { 1643 t.Fatal(err) 1644 } 1645 exp := example.Exp 1646 1647 if !reflect.DeepEqual(got, exp) { 1648 t.Fatalf("got:\n%#v\n\nexp:\n%#v", got, exp) 1649 } 1650 }) 1651 s.Close() 1652 } 1653 } 1654} 1655 1656func TestStore_Measurements_Auth(t *testing.T) { 1657 t.Parallel() 1658 1659 test := func(index string) error { 1660 s := MustOpenStore(index) 1661 defer s.Close() 1662 1663 // Create shard #0 with data. 1664 s.MustCreateShardWithData("db0", "rp0", 0, 1665 `cpu,host=serverA value=1 0`, 1666 `cpu,host=serverA value=2 10`, 1667 `cpu,region=west value=3 20`, 1668 `cpu,secret=foo value=5 30`, // cpu still readable because it has other series that can be read. 1669 `mem,secret=foo value=1 30`, 1670 `disk value=4 30`, 1671 ) 1672 1673 authorizer := &internal.AuthorizerMock{ 1674 AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool { 1675 if database == "" || tags.GetString("secret") != "" { 1676 t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags) 1677 return false 1678 } 1679 return true 1680 }, 1681 } 1682 1683 names, err := s.MeasurementNames(context.Background(), authorizer, "db0", nil) 1684 if err != nil { 1685 return err 1686 } 1687 1688 // names should not contain any measurements where none of the associated 1689 // series are authorised for reads. 1690 expNames := 2 1691 var gotNames int 1692 for _, name := range names { 1693 if string(name) == "mem" { 1694 return fmt.Errorf("got measurement %q but it should be filtered.", name) 1695 } 1696 gotNames++ 1697 } 1698 1699 if gotNames != expNames { 1700 return fmt.Errorf("got %d measurements, but expected %d", gotNames, expNames) 1701 } 1702 1703 // Now delete all of the cpu series. 1704 cond, err := influxql.ParseExpr("host = 'serverA' OR region = 'west'") 1705 if err != nil { 1706 return err 1707 } 1708 1709 if err := s.DeleteSeries("db0", nil, cond); err != nil { 1710 return err 1711 } 1712 1713 if names, err = s.MeasurementNames(context.Background(), authorizer, "db0", nil); err != nil { 1714 return err 1715 } 1716 1717 // names should not contain any measurements where none of the associated 1718 // series are authorised for reads. 1719 expNames = 1 1720 gotNames = 0 1721 for _, name := range names { 1722 if string(name) == "mem" || string(name) == "cpu" { 1723 return fmt.Errorf("after delete got measurement %q but it should be filtered.", name) 1724 } 1725 gotNames++ 1726 } 1727 1728 if gotNames != expNames { 1729 return fmt.Errorf("after delete got %d measurements, but expected %d", gotNames, expNames) 1730 } 1731 1732 return nil 1733 } 1734 1735 for _, index := range tsdb.RegisteredIndexes() { 1736 t.Run(index, func(t *testing.T) { 1737 if err := test(index); err != nil { 1738 t.Fatal(err) 1739 } 1740 }) 1741 } 1742 1743} 1744 1745func TestStore_TagKeys_Auth(t *testing.T) { 1746 t.Parallel() 1747 1748 test := func(index string) error { 1749 s := MustOpenStore(index) 1750 defer s.Close() 1751 1752 // Create shard #0 with data. 1753 s.MustCreateShardWithData("db0", "rp0", 0, 1754 `cpu,host=serverA value=1 0`, 1755 `cpu,host=serverA,debug=true value=2 10`, 1756 `cpu,region=west value=3 20`, 1757 `cpu,secret=foo,machine=a value=1 20`, 1758 ) 1759 1760 authorizer := &internal.AuthorizerMock{ 1761 AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool { 1762 if database == "" || !bytes.Equal(measurement, []byte("cpu")) || tags.GetString("secret") != "" { 1763 t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags) 1764 return false 1765 } 1766 return true 1767 }, 1768 } 1769 1770 keys, err := s.TagKeys(context.Background(), authorizer, []uint64{0}, nil) 1771 if err != nil { 1772 return err 1773 } 1774 1775 // keys should not contain any tag keys associated with a series containing 1776 // a secret tag. 1777 expKeys := 3 1778 var gotKeys int 1779 for _, tk := range keys { 1780 if got, exp := tk.Measurement, "cpu"; got != exp { 1781 return fmt.Errorf("got measurement %q, expected %q", got, exp) 1782 } 1783 1784 for _, key := range tk.Keys { 1785 if key == "secret" || key == "machine" { 1786 return fmt.Errorf("got tag key %q but it should be filtered.", key) 1787 } 1788 gotKeys++ 1789 } 1790 } 1791 1792 if gotKeys != expKeys { 1793 return fmt.Errorf("got %d keys, but expected %d", gotKeys, expKeys) 1794 } 1795 1796 // Delete the series with region = west 1797 cond, err := influxql.ParseExpr("region = 'west'") 1798 if err != nil { 1799 return err 1800 } 1801 if err := s.DeleteSeries("db0", nil, cond); err != nil { 1802 return err 1803 } 1804 1805 if keys, err = s.TagKeys(context.Background(), authorizer, []uint64{0}, nil); err != nil { 1806 return err 1807 } 1808 1809 // keys should not contain any tag keys associated with a series containing 1810 // a secret tag or the deleted series 1811 expKeys = 2 1812 gotKeys = 0 1813 for _, tk := range keys { 1814 if got, exp := tk.Measurement, "cpu"; got != exp { 1815 return fmt.Errorf("got measurement %q, expected %q", got, exp) 1816 } 1817 1818 for _, key := range tk.Keys { 1819 if key == "secret" || key == "machine" || key == "region" { 1820 return fmt.Errorf("got tag key %q but it should be filtered.", key) 1821 } 1822 gotKeys++ 1823 } 1824 } 1825 1826 if gotKeys != expKeys { 1827 return fmt.Errorf("got %d keys, but expected %d", gotKeys, expKeys) 1828 } 1829 1830 return nil 1831 } 1832 1833 for _, index := range tsdb.RegisteredIndexes() { 1834 t.Run(index, func(t *testing.T) { 1835 if err := test(index); err != nil { 1836 t.Fatal(err) 1837 } 1838 }) 1839 } 1840 1841} 1842 1843func TestStore_TagValues_Auth(t *testing.T) { 1844 t.Parallel() 1845 1846 test := func(index string) error { 1847 s := MustOpenStore(index) 1848 defer s.Close() 1849 1850 // Create shard #0 with data. 1851 s.MustCreateShardWithData("db0", "rp0", 0, 1852 `cpu,host=serverA value=1 0`, 1853 `cpu,host=serverA value=2 10`, 1854 `cpu,host=serverB value=3 20`, 1855 `cpu,secret=foo,host=serverD value=1 20`, 1856 ) 1857 1858 authorizer := &internal.AuthorizerMock{ 1859 AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool { 1860 if database == "" || !bytes.Equal(measurement, []byte("cpu")) || tags.GetString("secret") != "" { 1861 t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags) 1862 return false 1863 } 1864 return true 1865 }, 1866 } 1867 1868 values, err := s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{ 1869 Op: influxql.EQ, 1870 LHS: &influxql.VarRef{Val: "_tagKey"}, 1871 RHS: &influxql.StringLiteral{Val: "host"}, 1872 }) 1873 1874 if err != nil { 1875 return err 1876 } 1877 1878 // values should not contain any tag values associated with a series containing 1879 // a secret tag. 1880 expValues := 2 1881 var gotValues int 1882 for _, tv := range values { 1883 if got, exp := tv.Measurement, "cpu"; got != exp { 1884 return fmt.Errorf("got measurement %q, expected %q", got, exp) 1885 } 1886 1887 for _, v := range tv.Values { 1888 if got, exp := v.Value, "serverD"; got == exp { 1889 return fmt.Errorf("got tag value %q but it should be filtered.", got) 1890 } 1891 gotValues++ 1892 } 1893 } 1894 1895 if gotValues != expValues { 1896 return fmt.Errorf("got %d tags, but expected %d", gotValues, expValues) 1897 } 1898 1899 // Delete the series with values serverA 1900 cond, err := influxql.ParseExpr("host = 'serverA'") 1901 if err != nil { 1902 return err 1903 } 1904 if err := s.DeleteSeries("db0", nil, cond); err != nil { 1905 return err 1906 } 1907 1908 values, err = s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{ 1909 Op: influxql.EQ, 1910 LHS: &influxql.VarRef{Val: "_tagKey"}, 1911 RHS: &influxql.StringLiteral{Val: "host"}, 1912 }) 1913 1914 if err != nil { 1915 return err 1916 } 1917 1918 // values should not contain any tag values associated with a series containing 1919 // a secret tag. 1920 expValues = 1 1921 gotValues = 0 1922 for _, tv := range values { 1923 if got, exp := tv.Measurement, "cpu"; got != exp { 1924 return fmt.Errorf("got measurement %q, expected %q", got, exp) 1925 } 1926 1927 for _, v := range tv.Values { 1928 if got, exp := v.Value, "serverD"; got == exp { 1929 return fmt.Errorf("got tag value %q but it should be filtered.", got) 1930 } else if got, exp := v.Value, "serverA"; got == exp { 1931 return fmt.Errorf("got tag value %q but it should be filtered.", got) 1932 } 1933 gotValues++ 1934 } 1935 } 1936 1937 if gotValues != expValues { 1938 return fmt.Errorf("got %d values, but expected %d", gotValues, expValues) 1939 } 1940 return nil 1941 } 1942 1943 for _, index := range tsdb.RegisteredIndexes() { 1944 t.Run(index, func(t *testing.T) { 1945 if err := test(index); err != nil { 1946 t.Fatal(err) 1947 } 1948 }) 1949 } 1950} 1951 1952// Helper to create some tag values 1953func createTagValues(mname string, kvs map[string][]string) tsdb.TagValues { 1954 var sz int 1955 for _, v := range kvs { 1956 sz += len(v) 1957 } 1958 1959 out := tsdb.TagValues{ 1960 Measurement: mname, 1961 Values: make([]tsdb.KeyValue, 0, sz), 1962 } 1963 1964 for tk, tvs := range kvs { 1965 for _, tv := range tvs { 1966 out.Values = append(out.Values, tsdb.KeyValue{Key: tk, Value: tv}) 1967 } 1968 // We have to sort the KeyValues since that's how they're provided from 1969 // the tsdb.Store. 1970 sort.Sort(tsdb.KeyValues(out.Values)) 1971 } 1972 1973 return out 1974} 1975 1976func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) { 1977 for _, index := range tsdb.RegisteredIndexes() { 1978 s := MustOpenStore(index) 1979 defer s.Close() 1980 1981 shardN := 10 1982 for i := 0; i < shardN; i++ { 1983 // Create new shards with some data 1984 s.MustCreateShardWithData("db0", "rp0", i, 1985 `cpu,host=serverA value=1 30`, 1986 `mem,region=west value=2 40`, // skip: wrong source 1987 `cpu,host=serverC value=3 60`, 1988 ) 1989 } 1990 1991 done := make(chan struct{}) 1992 errC := make(chan error, 2) 1993 1994 // Randomly close and open the shards. 1995 go func() { 1996 for { 1997 select { 1998 case <-done: 1999 errC <- nil 2000 return 2001 default: 2002 i := uint64(rand.Intn(int(shardN))) 2003 if sh := s.Shard(i); sh == nil { 2004 errC <- errors.New("shard should not be nil") 2005 return 2006 } else { 2007 if err := sh.Close(); err != nil { 2008 errC <- err 2009 return 2010 } 2011 time.Sleep(500 * time.Microsecond) 2012 if err := sh.Open(); err != nil { 2013 errC <- err 2014 return 2015 } 2016 } 2017 } 2018 } 2019 }() 2020 2021 // Attempt to get tag keys from the shards. 2022 go func() { 2023 for { 2024 select { 2025 case <-done: 2026 errC <- nil 2027 return 2028 default: 2029 names, err := s.MeasurementNames(context.Background(), nil, "db0", nil) 2030 if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed { 2031 continue // These errors are expected 2032 } 2033 2034 if err != nil { 2035 errC <- err 2036 return 2037 } 2038 2039 if got, exp := names, slices.StringsToBytes("cpu", "mem"); !reflect.DeepEqual(got, exp) { 2040 errC <- fmt.Errorf("got keys %v, expected %v", got, exp) 2041 return 2042 } 2043 } 2044 } 2045 }() 2046 2047 // Run for 500ms 2048 time.Sleep(500 * time.Millisecond) 2049 close(done) 2050 2051 // Check for errors. 2052 if err := <-errC; err != nil { 2053 t.Fatal(err) 2054 } 2055 if err := <-errC; err != nil { 2056 t.Fatal(err) 2057 } 2058 } 2059} 2060 2061func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) { 2062 for _, index := range tsdb.RegisteredIndexes() { 2063 s := MustOpenStore(index) 2064 defer s.Close() 2065 2066 shardN := 10 2067 for i := 0; i < shardN; i++ { 2068 // Create new shards with some data 2069 s.MustCreateShardWithData("db0", "rp0", i, 2070 `cpu,host=serverA value=1 30`, 2071 `mem,region=west value=2 40`, // skip: wrong source 2072 `cpu,host=serverC value=3 60`, 2073 ) 2074 } 2075 2076 done := make(chan struct{}) 2077 errC := make(chan error, 2) 2078 2079 // Randomly close and open the shards. 2080 go func() { 2081 for { 2082 select { 2083 case <-done: 2084 errC <- nil 2085 return 2086 default: 2087 i := uint64(rand.Intn(int(shardN))) 2088 if sh := s.Shard(i); sh == nil { 2089 errC <- errors.New("shard should not be nil") 2090 return 2091 } else { 2092 if err := sh.Close(); err != nil { 2093 errC <- err 2094 return 2095 } 2096 time.Sleep(500 * time.Microsecond) 2097 if err := sh.Open(); err != nil { 2098 errC <- err 2099 return 2100 } 2101 } 2102 } 2103 } 2104 }() 2105 2106 // Attempt to get tag keys from the shards. 2107 go func() { 2108 for { 2109 select { 2110 case <-done: 2111 errC <- nil 2112 return 2113 default: 2114 keys, err := s.TagKeys(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) 2115 if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed { 2116 continue // These errors are expected 2117 } 2118 2119 if err != nil { 2120 errC <- err 2121 return 2122 } 2123 2124 if got, exp := keys[0].Keys, []string{"host"}; !reflect.DeepEqual(got, exp) { 2125 errC <- fmt.Errorf("got keys %v, expected %v", got, exp) 2126 return 2127 } 2128 2129 if got, exp := keys[1].Keys, []string{"region"}; !reflect.DeepEqual(got, exp) { 2130 errC <- fmt.Errorf("got keys %v, expected %v", got, exp) 2131 return 2132 } 2133 } 2134 } 2135 }() 2136 2137 // Run for 500ms 2138 time.Sleep(500 * time.Millisecond) 2139 2140 close(done) 2141 2142 // Check for errors 2143 if err := <-errC; err != nil { 2144 t.Fatal(err) 2145 } 2146 if err := <-errC; err != nil { 2147 t.Fatal(err) 2148 } 2149 } 2150} 2151 2152func TestStore_TagValues_ConcurrentDropShard(t *testing.T) { 2153 for _, index := range tsdb.RegisteredIndexes() { 2154 s := MustOpenStore(index) 2155 defer s.Close() 2156 2157 shardN := 10 2158 for i := 0; i < shardN; i++ { 2159 // Create new shards with some data 2160 s.MustCreateShardWithData("db0", "rp0", i, 2161 `cpu,host=serverA value=1 30`, 2162 `mem,region=west value=2 40`, // skip: wrong source 2163 `cpu,host=serverC value=3 60`, 2164 ) 2165 } 2166 2167 done := make(chan struct{}) 2168 errC := make(chan error, 2) 2169 2170 // Randomly close and open the shards. 2171 go func() { 2172 for { 2173 select { 2174 case <-done: 2175 errC <- nil 2176 return 2177 default: 2178 i := uint64(rand.Intn(int(shardN))) 2179 if sh := s.Shard(i); sh == nil { 2180 errC <- errors.New("shard should not be nil") 2181 return 2182 } else { 2183 if err := sh.Close(); err != nil { 2184 errC <- err 2185 return 2186 } 2187 time.Sleep(500 * time.Microsecond) 2188 if err := sh.Open(); err != nil { 2189 errC <- err 2190 return 2191 } 2192 } 2193 } 2194 } 2195 }() 2196 2197 // Attempt to get tag keys from the shards. 2198 go func() { 2199 for { 2200 select { 2201 case <-done: 2202 errC <- nil 2203 return 2204 default: 2205 stmt, err := influxql.ParseStatement(`SHOW TAG VALUES WITH KEY = "host"`) 2206 if err != nil { 2207 errC <- err 2208 } 2209 rewrite, err := query.RewriteStatement(stmt) 2210 if err != nil { 2211 errC <- err 2212 } 2213 2214 cond := rewrite.(*influxql.ShowTagValuesStatement).Condition 2215 values, err := s.TagValues(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, cond) 2216 if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed { 2217 continue // These errors are expected 2218 } 2219 2220 if err != nil { 2221 errC <- err 2222 return 2223 } 2224 2225 exp := tsdb.TagValues{ 2226 Measurement: "cpu", 2227 Values: []tsdb.KeyValue{ 2228 {Key: "host", Value: "serverA"}, 2229 {Key: "host", Value: "serverC"}, 2230 }, 2231 } 2232 2233 if got := values[0]; !reflect.DeepEqual(got, exp) { 2234 errC <- fmt.Errorf("got keys %v, expected %v", got, exp) 2235 return 2236 } 2237 } 2238 } 2239 }() 2240 2241 // Run for 500ms 2242 time.Sleep(500 * time.Millisecond) 2243 2244 close(done) 2245 2246 // Check for errors 2247 if err := <-errC; err != nil { 2248 t.Fatal(err) 2249 } 2250 if err := <-errC; err != nil { 2251 t.Fatal(err) 2252 } 2253 } 2254} 2255 2256func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) { 2257 for _, index := range tsdb.RegisteredIndexes() { 2258 store := NewStore(index) 2259 if err := store.Open(); err != nil { 2260 panic(err) 2261 } 2262 2263 // Write a point to n shards. 2264 for shardID := 0; shardID < 100; shardID++ { 2265 if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil { 2266 b.Fatalf("create shard: %s", err) 2267 } 2268 2269 err := store.WriteToShard(tsdb.WriteContext{}, uint64(shardID), []models.Point{models.MustNewPoint("cpu", nil, map[string]interface{}{"value": 1.0}, time.Now())}) 2270 if err != nil { 2271 b.Fatalf("write: %s", err) 2272 } 2273 } 2274 2275 b.Run(store.EngineOptions.IndexVersion, func(b *testing.B) { 2276 for i := 0; i < b.N; i++ { 2277 _, _ = store.SeriesCardinality(context.Background(), "db") 2278 } 2279 }) 2280 store.Close() 2281 } 2282} 2283 2284func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(b, 64, 5, 5, 1, 100) } 2285 2286func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) { 2287 var store *Store 2288 setup := func(index string) error { 2289 store := MustOpenStore(index) 2290 2291 // Generate test series (measurements + unique tag sets). 2292 series := genTestSeries(mCnt, tkCnt, tvCnt) 2293 2294 // Generate point data to write to the shards. 2295 points := []models.Point{} 2296 for _, s := range series { 2297 for val := 0.0; val < float64(pntCnt); val++ { 2298 p := models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": val}, time.Now()) 2299 points = append(points, p) 2300 } 2301 } 2302 2303 // Create requested number of shards in the store & write points. 2304 for shardID := 0; shardID < shardCnt; shardID++ { 2305 if err := store.CreateShard("mydb", "myrp", uint64(shardID), true); err != nil { 2306 return fmt.Errorf("create shard: %s", err) 2307 } 2308 if err := store.BatchWrite(shardID, points); err != nil { 2309 return fmt.Errorf("batch write: %s", err) 2310 } 2311 } 2312 return nil 2313 } 2314 2315 for _, index := range tsdb.RegisteredIndexes() { 2316 if err := setup(index); err != nil { 2317 b.Fatal(err) 2318 } 2319 b.Run(store.EngineOptions.IndexVersion, func(b *testing.B) { 2320 for n := 0; n < b.N; n++ { 2321 store := tsdb.NewStore(store.Path()) 2322 if err := store.Open(); err != nil { 2323 b.Fatalf("open store error: %s", err) 2324 } 2325 2326 b.StopTimer() 2327 store.Close() 2328 b.StartTimer() 2329 } 2330 }) 2331 os.RemoveAll(store.Path()) 2332 } 2333} 2334 2335// To store result of benchmark (ensure allocated on heap). 2336var tvResult []tsdb.TagValues 2337 2338func BenchmarkStore_TagValues(b *testing.B) { 2339 benchmarks := []struct { 2340 name string 2341 shards int 2342 measurements int 2343 tagValues int 2344 }{ 2345 {name: "s=1_m=1_v=100", shards: 1, measurements: 1, tagValues: 100}, 2346 {name: "s=1_m=1_v=1000", shards: 1, measurements: 1, tagValues: 1000}, 2347 {name: "s=1_m=10_v=100", shards: 1, measurements: 10, tagValues: 100}, 2348 {name: "s=1_m=10_v=1000", shards: 1, measurements: 10, tagValues: 1000}, 2349 {name: "s=1_m=100_v=100", shards: 1, measurements: 100, tagValues: 100}, 2350 {name: "s=1_m=100_v=1000", shards: 1, measurements: 100, tagValues: 1000}, 2351 {name: "s=10_m=1_v=100", shards: 10, measurements: 1, tagValues: 100}, 2352 {name: "s=10_m=1_v=1000", shards: 10, measurements: 1, tagValues: 1000}, 2353 {name: "s=10_m=10_v=100", shards: 10, measurements: 10, tagValues: 100}, 2354 {name: "s=10_m=10_v=1000", shards: 10, measurements: 10, tagValues: 1000}, 2355 {name: "s=10_m=100_v=100", shards: 10, measurements: 100, tagValues: 100}, 2356 {name: "s=10_m=100_v=1000", shards: 10, measurements: 100, tagValues: 1000}, 2357 } 2358 2359 setup := func(shards, measurements, tagValues int, index string, useRandom bool) (*Store, []uint64) { // returns shard ids 2360 s := NewStore(index) 2361 if err := s.Open(); err != nil { 2362 panic(err) 2363 } 2364 2365 fmtStr := `cpu%[1]d,host=tv%[2]d,shard=s%[3]d,z1=s%[1]d%[2]d,z2=%[4]s value=1 %[5]d` 2366 // genPoints generates some point data. If ran is true then random tag 2367 // key values will be generated, meaning more work sorting and merging. 2368 // If ran is false, then the same set of points will be produced for the 2369 // same set of parameters, meaning more de-duplication of points will be 2370 // needed. 2371 genPoints := func(sid int, ran bool) []string { 2372 var v, ts int 2373 var half string 2374 points := make([]string, 0, measurements*tagValues) 2375 for m := 0; m < measurements; m++ { 2376 for tagvid := 0; tagvid < tagValues; tagvid++ { 2377 v = tagvid 2378 if ran { 2379 v = rand.Intn(100000) 2380 } 2381 half = fmt.Sprint(rand.Intn(2) == 0) 2382 points = append(points, fmt.Sprintf(fmtStr, m, v, sid, half, ts)) 2383 ts++ 2384 } 2385 } 2386 return points 2387 } 2388 2389 // Create data across chosen number of shards. 2390 var shardIDs []uint64 2391 for i := 0; i < shards; i++ { 2392 shardIDs = append(shardIDs, uint64(i)) 2393 s.MustCreateShardWithData("db0", "rp0", i, genPoints(i, useRandom)...) 2394 } 2395 return s, shardIDs 2396 } 2397 2398 // SHOW TAG VALUES WITH KEY IN ("host", "shard") 2399 cond1 := &influxql.ParenExpr{ 2400 Expr: &influxql.BinaryExpr{ 2401 Op: influxql.OR, 2402 LHS: &influxql.BinaryExpr{ 2403 Op: influxql.EQ, 2404 LHS: &influxql.VarRef{Val: "_tagKey"}, 2405 RHS: &influxql.StringLiteral{Val: "host"}, 2406 }, 2407 RHS: &influxql.BinaryExpr{ 2408 Op: influxql.EQ, 2409 LHS: &influxql.VarRef{Val: "_tagKey"}, 2410 RHS: &influxql.StringLiteral{Val: "shard"}, 2411 }, 2412 }, 2413 } 2414 2415 cond2 := &influxql.ParenExpr{ 2416 Expr: &influxql.BinaryExpr{ 2417 Op: influxql.AND, 2418 LHS: &influxql.ParenExpr{ 2419 Expr: &influxql.BinaryExpr{ 2420 Op: influxql.EQ, 2421 LHS: &influxql.VarRef{Val: "z2"}, 2422 RHS: &influxql.StringLiteral{Val: "true"}, 2423 }, 2424 }, 2425 RHS: cond1, 2426 }, 2427 } 2428 2429 var err error 2430 for _, index := range tsdb.RegisteredIndexes() { 2431 for useRand := 0; useRand < 2; useRand++ { 2432 for c, condition := range []influxql.Expr{cond1, cond2} { 2433 for _, bm := range benchmarks { 2434 s, shardIDs := setup(bm.shards, bm.measurements, bm.tagValues, index, useRand == 1) 2435 teardown := func() { 2436 if err := s.Close(); err != nil { 2437 b.Fatal(err) 2438 } 2439 } 2440 2441 cnd := "Unfiltered" 2442 if c == 0 { 2443 cnd = "Filtered" 2444 } 2445 b.Run("random_values="+fmt.Sprint(useRand == 1)+"_index="+index+"_"+cnd+"_"+bm.name, func(b *testing.B) { 2446 for i := 0; i < b.N; i++ { 2447 if tvResult, err = s.TagValues(context.Background(), nil, shardIDs, condition); err != nil { 2448 b.Fatal(err) 2449 } 2450 } 2451 }) 2452 teardown() 2453 } 2454 } 2455 } 2456 } 2457} 2458 2459// Store is a test wrapper for tsdb.Store. 2460type Store struct { 2461 *tsdb.Store 2462 index string 2463} 2464 2465// NewStore returns a new instance of Store with a temporary path. 2466func NewStore(index string) *Store { 2467 path, err := ioutil.TempDir("", "influxdb-tsdb-") 2468 if err != nil { 2469 panic(err) 2470 } 2471 2472 s := &Store{Store: tsdb.NewStore(path), index: index} 2473 s.EngineOptions.IndexVersion = index 2474 s.EngineOptions.Config.WALDir = filepath.Join(path, "wal") 2475 s.EngineOptions.Config.TraceLoggingEnabled = true 2476 2477 if testing.Verbose() { 2478 s.WithLogger(logger.New(os.Stdout)) 2479 } 2480 2481 return s 2482} 2483 2484// MustOpenStore returns a new, open Store using the specified index, 2485// at a temporary path. 2486func MustOpenStore(index string) *Store { 2487 s := NewStore(index) 2488 2489 if err := s.Open(); err != nil { 2490 panic(err) 2491 } 2492 return s 2493} 2494 2495// Reopen closes and reopens the store as a new store. 2496func (s *Store) Reopen() error { 2497 if err := s.Store.Close(); err != nil { 2498 return err 2499 } 2500 2501 s.Store = tsdb.NewStore(s.Path()) 2502 s.EngineOptions.IndexVersion = s.index 2503 s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal") 2504 s.EngineOptions.Config.TraceLoggingEnabled = true 2505 2506 if testing.Verbose() { 2507 s.WithLogger(logger.New(os.Stdout)) 2508 } 2509 return s.Store.Open() 2510} 2511 2512// Close closes the store and removes the underlying data. 2513func (s *Store) Close() error { 2514 defer os.RemoveAll(s.Path()) 2515 return s.Store.Close() 2516} 2517 2518// MustCreateShardWithData creates a shard and writes line protocol data to it. 2519func (s *Store) MustCreateShardWithData(db, rp string, shardID int, data ...string) { 2520 if err := s.CreateShard(db, rp, uint64(shardID), true); err != nil { 2521 panic(err) 2522 } 2523 s.MustWriteToShardString(shardID, data...) 2524} 2525 2526// MustWriteToShardString parses the line protocol (with second precision) and 2527// inserts the resulting points into a shard. Panic on error. 2528func (s *Store) MustWriteToShardString(shardID int, data ...string) { 2529 var points []models.Point 2530 for i := range data { 2531 a, err := models.ParsePointsWithPrecision([]byte(strings.TrimSpace(data[i])), time.Time{}, "s") 2532 if err != nil { 2533 panic(err) 2534 } 2535 points = append(points, a...) 2536 } 2537 2538 if err := s.WriteToShard(tsdb.WriteContext{}, uint64(shardID), points); err != nil { 2539 panic(err) 2540 } 2541} 2542 2543// BatchWrite writes points to a shard in chunks. 2544func (s *Store) BatchWrite(shardID int, points []models.Point) error { 2545 nPts := len(points) 2546 chunkSz := 10000 2547 start := 0 2548 end := chunkSz 2549 2550 for { 2551 if end > nPts { 2552 end = nPts 2553 } 2554 if end-start == 0 { 2555 break 2556 } 2557 2558 if err := s.WriteToShard(tsdb.WriteContext{}, uint64(shardID), points[start:end]); err != nil { 2559 return err 2560 } 2561 start = end 2562 end += chunkSz 2563 } 2564 return nil 2565} 2566 2567// ParseTags returns an instance of Tags for a comma-delimited list of key/values. 2568func ParseTags(s string) query.Tags { 2569 m := make(map[string]string) 2570 for _, kv := range strings.Split(s, ",") { 2571 a := strings.Split(kv, "=") 2572 m[a[0]] = a[1] 2573 } 2574 return query.NewTags(m) 2575} 2576 2577func dirExists(path string) bool { 2578 var err error 2579 if _, err = os.Stat(path); err == nil { 2580 return true 2581 } 2582 return !os.IsNotExist(err) 2583} 2584