1package tsdb_test
2
3import (
4	"bytes"
5	"context"
6	"errors"
7	"fmt"
8	"io/ioutil"
9	"math"
10	"math/rand"
11	"os"
12	"path/filepath"
13	"reflect"
14	"regexp"
15	"sort"
16	"strings"
17	"sync"
18	"testing"
19	"time"
20
21	"github.com/davecgh/go-spew/spew"
22	"github.com/influxdata/influxdb/internal"
23	"github.com/influxdata/influxdb/logger"
24	"github.com/influxdata/influxdb/models"
25	"github.com/influxdata/influxdb/pkg/deep"
26	"github.com/influxdata/influxdb/pkg/slices"
27	"github.com/influxdata/influxdb/query"
28	"github.com/influxdata/influxdb/tsdb"
29	"github.com/influxdata/influxdb/tsdb/index/inmem"
30	"github.com/influxdata/influxql"
31)
32
33// Ensure the store can delete a retention policy and all shards under
34// it.
35func TestStore_DeleteRetentionPolicy(t *testing.T) {
36	t.Parallel()
37
38	test := func(index string) {
39		s := MustOpenStore(index)
40		defer s.Close()
41
42		// Create a new shard and verify that it exists.
43		if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
44			t.Fatal(err)
45		} else if sh := s.Shard(1); sh == nil {
46			t.Fatalf("expected shard")
47		}
48
49		// Create a new shard under the same retention policy,  and verify
50		// that it exists.
51		if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
52			t.Fatal(err)
53		} else if sh := s.Shard(2); sh == nil {
54			t.Fatalf("expected shard")
55		}
56
57		// Create a new shard under a different retention policy, and
58		// verify that it exists.
59		if err := s.CreateShard("db0", "rp1", 3, true); err != nil {
60			t.Fatal(err)
61		} else if sh := s.Shard(3); sh == nil {
62			t.Fatalf("expected shard")
63		}
64
65		// Deleting the rp0 retention policy does not return an error.
66		if err := s.DeleteRetentionPolicy("db0", "rp0"); err != nil {
67			t.Fatal(err)
68		}
69
70		// It deletes the shards under that retention policy.
71		if sh := s.Shard(1); sh != nil {
72			t.Errorf("shard 1 was not deleted")
73		}
74
75		if sh := s.Shard(2); sh != nil {
76			t.Errorf("shard 2 was not deleted")
77		}
78
79		// It deletes the retention policy directory.
80		if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp0")), false; got != exp {
81			t.Error("directory exists, but should have been removed")
82		}
83
84		// It deletes the WAL retention policy directory.
85		if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp0")), false; got != exp {
86			t.Error("directory exists, but should have been removed")
87		}
88
89		// Reopen other shard and check it still exists.
90		if err := s.Reopen(); err != nil {
91			t.Error(err)
92		} else if sh := s.Shard(3); sh == nil {
93			t.Errorf("shard 3 does not exist")
94		}
95
96		// It does not delete other retention policy directories.
97		if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp1")), true; got != exp {
98			t.Error("directory does not exist, but should")
99		}
100		if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp1")), true; got != exp {
101			t.Error("directory does not exist, but should")
102		}
103	}
104
105	for _, index := range tsdb.RegisteredIndexes() {
106		t.Run(index, func(t *testing.T) { test(index) })
107	}
108}
109
110// Ensure the store can create a new shard.
111func TestStore_CreateShard(t *testing.T) {
112	t.Parallel()
113
114	test := func(index string) {
115		s := MustOpenStore(index)
116		defer s.Close()
117
118		// Create a new shard and verify that it exists.
119		if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
120			t.Fatal(err)
121		} else if sh := s.Shard(1); sh == nil {
122			t.Fatalf("expected shard")
123		}
124
125		// Create another shard and verify that it exists.
126		if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
127			t.Fatal(err)
128		} else if sh := s.Shard(2); sh == nil {
129			t.Fatalf("expected shard")
130		}
131
132		// Reopen shard and recheck.
133		if err := s.Reopen(); err != nil {
134			t.Fatal(err)
135		} else if sh := s.Shard(1); sh == nil {
136			t.Fatalf("expected shard(1)")
137		} else if sh = s.Shard(2); sh == nil {
138			t.Fatalf("expected shard(2)")
139		}
140	}
141
142	for _, index := range tsdb.RegisteredIndexes() {
143		t.Run(index, func(t *testing.T) { test(index) })
144	}
145}
146
147func TestStore_CreateMixedShards(t *testing.T) {
148	t.Parallel()
149
150	test := func(index1 string, index2 string) {
151		s := MustOpenStore(index1)
152		defer s.Close()
153
154		// Create a new shard and verify that it exists.
155		if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
156			t.Fatal(err)
157		} else if sh := s.Shard(1); sh == nil {
158			t.Fatalf("expected shard")
159		}
160
161		s.EngineOptions.IndexVersion = index2
162		s.index = index2
163		if err := s.Reopen(); err != nil {
164			t.Fatal(err)
165		}
166
167		// Create another shard and verify that it exists.
168		if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
169			t.Fatal(err)
170		} else if sh := s.Shard(2); sh == nil {
171			t.Fatalf("expected shard")
172		}
173
174		// Reopen shard and recheck.
175		if err := s.Reopen(); err != nil {
176			t.Fatal(err)
177		} else if sh := s.Shard(1); sh == nil {
178			t.Fatalf("expected shard(1)")
179		} else if sh = s.Shard(2); sh == nil {
180			t.Fatalf("expected shard(2)")
181		}
182
183		sh := s.Shard(1)
184		if sh.IndexType() != index1 {
185			t.Fatalf("got index %v, expected %v", sh.IndexType(), index1)
186		}
187
188		sh = s.Shard(2)
189		if sh.IndexType() != index2 {
190			t.Fatalf("got index %v, expected %v", sh.IndexType(), index2)
191		}
192	}
193
194	indexes := tsdb.RegisteredIndexes()
195	for i := range indexes {
196		j := (i + 1) % len(indexes)
197		index1 := indexes[i]
198		index2 := indexes[j]
199		t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(index1, index2) })
200	}
201}
202
203func TestStore_DropMeasurementMixedShards(t *testing.T) {
204	t.Parallel()
205
206	test := func(index1 string, index2 string) {
207		s := MustOpenStore(index1)
208		defer s.Close()
209
210		if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
211			t.Fatal(err)
212		}
213
214		s.MustWriteToShardString(1, "mem,server=a v=1 10")
215
216		s.EngineOptions.IndexVersion = index2
217		s.index = index2
218		if err := s.Reopen(); err != nil {
219			t.Fatal(err)
220		}
221
222		if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
223			t.Fatal(err)
224		}
225
226		s.MustWriteToShardString(2, "mem,server=b v=1 20")
227
228		s.MustWriteToShardString(1, "cpu,server=a v=1 10")
229		s.MustWriteToShardString(2, "cpu,server=b v=1 20")
230
231		err := s.DeleteMeasurement("db0", "cpu")
232		if err != tsdb.ErrMultipleIndexTypes {
233			t.Fatal(err)
234		} else if err == nil {
235			t.Fatal("expect failure deleting measurement on multiple index types")
236		}
237	}
238
239	indexes := tsdb.RegisteredIndexes()
240	for i := range indexes {
241		j := (i + 1) % len(indexes)
242		index1 := indexes[i]
243		index2 := indexes[j]
244		t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(index1, index2) })
245	}
246}
247
248func TestStore_DropConcurrentWriteMultipleShards(t *testing.T) {
249	t.Parallel()
250
251	test := func(index string) {
252		s := MustOpenStore(index)
253		defer s.Close()
254
255		if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
256			t.Fatal(err)
257		}
258
259		s.MustWriteToShardString(1, "mem,server=a v=1 10")
260
261		if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
262			t.Fatal(err)
263		}
264
265		s.MustWriteToShardString(2, "mem,server=b v=1 20")
266
267		errCh := make(chan error)
268		go func() {
269			for i := 0; i < 50; i++ {
270				s.MustWriteToShardString(1, "cpu,server=a v=1 10")
271				s.MustWriteToShardString(2, "cpu,server=b v=1 20")
272			}
273			errCh <- nil
274		}()
275
276		go func() {
277			for i := 0; i < 50; i++ {
278				if err := s.DeleteMeasurement("db0", "cpu"); err != nil {
279					errCh <- err
280					return
281				}
282			}
283			errCh <- nil
284		}()
285
286		for i := 0; i < 2; i++ {
287			if err := <-errCh; err != nil {
288				t.Fatal(err)
289			}
290		}
291		err := s.DeleteMeasurement("db0", "cpu")
292		if err != nil {
293			t.Fatal(err)
294		}
295
296		measurements, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil)
297		if err != nil {
298			t.Fatal(err)
299		}
300
301		exp := [][]byte{[]byte("mem")}
302		if got, exp := measurements, exp; !reflect.DeepEqual(got, exp) {
303			t.Fatal(fmt.Errorf("got measurements %v, expected %v", got, exp))
304		}
305	}
306
307	for _, index := range tsdb.RegisteredIndexes() {
308		t.Run(index, func(t *testing.T) { test(index) })
309	}
310}
311
312func TestStore_WriteMixedShards(t *testing.T) {
313	t.Parallel()
314
315	test := func(index1 string, index2 string) {
316		s := MustOpenStore(index1)
317		defer s.Close()
318
319		if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
320			t.Fatal(err)
321		}
322
323		s.MustWriteToShardString(1, "mem,server=a v=1 10")
324
325		s.EngineOptions.IndexVersion = index2
326		s.index = index2
327		if err := s.Reopen(); err != nil {
328			t.Fatal(err)
329		}
330
331		if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
332			t.Fatal(err)
333		}
334
335		s.MustWriteToShardString(2, "mem,server=b v=1 20")
336
337		var wg sync.WaitGroup
338		wg.Add(2)
339
340		go func() {
341			defer wg.Done()
342			for i := 0; i < 50; i++ {
343				s.MustWriteToShardString(1, fmt.Sprintf("cpu,server=a,f%0.2d=a v=1", i*2))
344			}
345		}()
346
347		go func() {
348			defer wg.Done()
349			for i := 0; i < 50; i++ {
350				s.MustWriteToShardString(2, fmt.Sprintf("cpu,server=b,f%0.2d=b v=1 20", i*2+1))
351			}
352		}()
353
354		wg.Wait()
355
356		keys, err := s.TagKeys(context.Background(), nil, []uint64{1, 2}, nil)
357		if err != nil {
358			t.Fatal(err)
359		}
360
361		cpuKeys := make([]string, 101)
362		for i := 0; i < 100; i++ {
363			cpuKeys[i] = fmt.Sprintf("f%0.2d", i)
364		}
365		cpuKeys[100] = "server"
366		expKeys := []tsdb.TagKeys{
367			{Measurement: "cpu", Keys: cpuKeys},
368			{Measurement: "mem", Keys: []string{"server"}},
369		}
370		if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) {
371			t.Fatalf("got keys %v, expected %v", got, exp)
372		}
373	}
374
375	indexes := tsdb.RegisteredIndexes()
376	for i := range indexes {
377		j := (i + 1) % len(indexes)
378		index1 := indexes[i]
379		index2 := indexes[j]
380		t.Run(fmt.Sprintf("%s-%s", index1, index2), func(t *testing.T) { test(index1, index2) })
381	}
382}
383
384// Ensure the store does not return an error when delete from a non-existent db.
385func TestStore_DeleteSeries_NonExistentDB(t *testing.T) {
386	t.Parallel()
387
388	test := func(index string) {
389		s := MustOpenStore(index)
390		defer s.Close()
391
392		if err := s.DeleteSeries("db0", nil, nil); err != nil {
393			t.Fatal(err.Error())
394		}
395	}
396
397	for _, index := range tsdb.RegisteredIndexes() {
398		t.Run(index, func(t *testing.T) { test(index) })
399	}
400}
401
402// Ensure the store can delete an existing shard.
403func TestStore_DeleteShard(t *testing.T) {
404	t.Parallel()
405
406	test := func(index string) error {
407		s := MustOpenStore(index)
408		defer s.Close()
409
410		// Create a new shard and verify that it exists.
411		if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
412			return err
413		} else if sh := s.Shard(1); sh == nil {
414			return fmt.Errorf("expected shard")
415		}
416
417		// Create another shard.
418		if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
419			return err
420		} else if sh := s.Shard(2); sh == nil {
421			return fmt.Errorf("expected shard")
422		}
423
424		// and another, but in a different db.
425		if err := s.CreateShard("db1", "rp0", 3, true); err != nil {
426			return err
427		} else if sh := s.Shard(3); sh == nil {
428			return fmt.Errorf("expected shard")
429		}
430
431		// Write series data to the db0 shards.
432		s.MustWriteToShardString(1, "cpu,servera=a v=1", "cpu,serverb=b v=1", "mem,serverc=a v=1")
433		s.MustWriteToShardString(2, "cpu,servera=a v=1", "mem,serverc=a v=1")
434
435		// Write similar data to db1 database
436		s.MustWriteToShardString(3, "cpu,serverb=b v=1")
437
438		// Reopen the store and check all shards still exist
439		if err := s.Reopen(); err != nil {
440			return err
441		}
442		for i := uint64(1); i <= 3; i++ {
443			if sh := s.Shard(i); sh == nil {
444				return fmt.Errorf("shard %d missing", i)
445			}
446		}
447
448		// Remove the first shard from the store.
449		if err := s.DeleteShard(1); err != nil {
450			return err
451		}
452
453		// cpu,serverb=b should be removed from the series file for db0 because
454		// shard 1 was the only owner of that series.
455		// Verify by getting  all tag keys.
456		keys, err := s.TagKeys(context.Background(), nil, []uint64{2}, nil)
457		if err != nil {
458			return err
459		}
460
461		expKeys := []tsdb.TagKeys{
462			{Measurement: "cpu", Keys: []string{"servera"}},
463			{Measurement: "mem", Keys: []string{"serverc"}},
464		}
465		if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) {
466			return fmt.Errorf("got keys %v, expected %v", got, exp)
467		}
468
469		// Verify that the same series was not removed from other databases'
470		// series files.
471		if keys, err = s.TagKeys(context.Background(), nil, []uint64{3}, nil); err != nil {
472			return err
473		}
474
475		expKeys = []tsdb.TagKeys{{Measurement: "cpu", Keys: []string{"serverb"}}}
476		if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) {
477			return fmt.Errorf("got keys %v, expected %v", got, exp)
478		}
479		return nil
480	}
481
482	for _, index := range tsdb.RegisteredIndexes() {
483		t.Run(index, func(t *testing.T) {
484			if err := test(index); err != nil {
485				t.Error(err)
486			}
487		})
488	}
489}
490
491// Ensure the store can create a snapshot to a shard.
492func TestStore_CreateShardSnapShot(t *testing.T) {
493	t.Parallel()
494
495	test := func(index string) {
496		s := MustOpenStore(index)
497		defer s.Close()
498
499		// Create a new shard and verify that it exists.
500		if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
501			t.Fatal(err)
502		} else if sh := s.Shard(1); sh == nil {
503			t.Fatalf("expected shard")
504		}
505
506		dir, e := s.CreateShardSnapshot(1, false)
507		if e != nil {
508			t.Fatal(e)
509		}
510		if dir == "" {
511			t.Fatal("empty directory name")
512		}
513	}
514
515	for _, index := range tsdb.RegisteredIndexes() {
516		t.Run(index, func(t *testing.T) { test(index) })
517	}
518}
519
520func TestStore_Open(t *testing.T) {
521	t.Parallel()
522
523	test := func(index string) {
524		s := NewStore(index)
525		defer s.Close()
526
527		if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0", "2"), 0777); err != nil {
528			t.Fatal(err)
529		}
530
531		if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp2", "4"), 0777); err != nil {
532			t.Fatal(err)
533		}
534
535		if err := os.MkdirAll(filepath.Join(s.Path(), "db1", "rp0", "1"), 0777); err != nil {
536			t.Fatal(err)
537		}
538
539		// Store should ignore shard since it does not have a numeric name.
540		if err := s.Open(); err != nil {
541			t.Fatal(err)
542		} else if n := len(s.Databases()); n != 2 {
543			t.Fatalf("unexpected database index count: %d", n)
544		} else if n := s.ShardN(); n != 3 {
545			t.Fatalf("unexpected shard count: %d", n)
546		}
547
548		expDatabases := []string{"db0", "db1"}
549		gotDatabases := s.Databases()
550		sort.Strings(gotDatabases)
551
552		if got, exp := gotDatabases, expDatabases; !reflect.DeepEqual(got, exp) {
553			t.Fatalf("got %#v, expected %#v", got, exp)
554		}
555	}
556
557	for _, index := range tsdb.RegisteredIndexes() {
558		t.Run(index, func(t *testing.T) { test(index) })
559	}
560}
561
562// Ensure the store reports an error when it can't open a database directory.
563func TestStore_Open_InvalidDatabaseFile(t *testing.T) {
564	t.Parallel()
565
566	test := func(index string) {
567		s := NewStore(index)
568		defer s.Close()
569
570		// Create a file instead of a directory for a database.
571		if _, err := os.Create(filepath.Join(s.Path(), "db0")); err != nil {
572			t.Fatal(err)
573		}
574
575		// Store should ignore database since it's a file.
576		if err := s.Open(); err != nil {
577			t.Fatal(err)
578		} else if n := len(s.Databases()); n != 0 {
579			t.Fatalf("unexpected database index count: %d", n)
580		}
581	}
582
583	for _, index := range tsdb.RegisteredIndexes() {
584		t.Run(index, func(t *testing.T) { test(index) })
585	}
586}
587
588// Ensure the store reports an error when it can't open a retention policy.
589func TestStore_Open_InvalidRetentionPolicy(t *testing.T) {
590	t.Parallel()
591
592	test := func(index string) {
593		s := NewStore(index)
594		defer s.Close()
595
596		// Create an RP file instead of a directory.
597		if err := os.MkdirAll(filepath.Join(s.Path(), "db0"), 0777); err != nil {
598			t.Fatal(err)
599		} else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0")); err != nil {
600			t.Fatal(err)
601		}
602
603		// Store should ignore retention policy since it's a file, and there should
604		// be no indices created.
605		if err := s.Open(); err != nil {
606			t.Fatal(err)
607		} else if n := len(s.Databases()); n != 0 {
608			t.Log(s.Databases())
609			t.Fatalf("unexpected database index count: %d", n)
610		}
611	}
612
613	for _, index := range tsdb.RegisteredIndexes() {
614		t.Run(index, func(t *testing.T) { test(index) })
615	}
616}
617
618// Ensure the store reports an error when it can't open a retention policy.
619func TestStore_Open_InvalidShard(t *testing.T) {
620	t.Parallel()
621
622	test := func(index string) {
623		s := NewStore(index)
624		defer s.Close()
625
626		// Create a non-numeric shard file.
627		if err := os.MkdirAll(filepath.Join(s.Path(), "db0", "rp0"), 0777); err != nil {
628			t.Fatal(err)
629		} else if _, err := os.Create(filepath.Join(s.Path(), "db0", "rp0", "bad_shard")); err != nil {
630			t.Fatal(err)
631		}
632
633		// Store should ignore shard since it does not have a numeric name.
634		if err := s.Open(); err != nil {
635			t.Fatal(err)
636		} else if n := len(s.Databases()); n != 0 {
637			t.Fatalf("unexpected database index count: %d", n)
638		} else if n := s.ShardN(); n != 0 {
639			t.Fatalf("unexpected shard count: %d", n)
640		}
641	}
642
643	for _, index := range tsdb.RegisteredIndexes() {
644		t.Run(index, func(t *testing.T) { test(index) })
645	}
646}
647
648// Ensure shards can create iterators.
649func TestShards_CreateIterator(t *testing.T) {
650	t.Parallel()
651
652	test := func(index string) {
653		s := MustOpenStore(index)
654		defer s.Close()
655
656		// Create shard #0 with data.
657		s.MustCreateShardWithData("db0", "rp0", 0,
658			`cpu,host=serverA value=1  0`,
659			`cpu,host=serverA value=2 10`,
660			`cpu,host=serverB value=3 20`,
661		)
662
663		// Create shard #1 with data.
664		s.MustCreateShardWithData("db0", "rp0", 1,
665			`cpu,host=serverA value=1 30`,
666			`mem,host=serverA value=2 40`, // skip: wrong source
667			`cpu,host=serverC value=3 60`,
668		)
669
670		// Retrieve shard group.
671		shards := s.ShardGroup([]uint64{0, 1})
672
673		// Create iterator.
674		m := &influxql.Measurement{Name: "cpu"}
675		itr, err := shards.CreateIterator(context.Background(), m, query.IteratorOptions{
676			Expr:       influxql.MustParseExpr(`value`),
677			Dimensions: []string{"host"},
678			Ascending:  true,
679			StartTime:  influxql.MinTime,
680			EndTime:    influxql.MaxTime,
681		})
682		if err != nil {
683			t.Fatal(err)
684		}
685		defer itr.Close()
686		fitr := itr.(query.FloatIterator)
687
688		// Read values from iterator. The host=serverA points should come first.
689		if p, err := fitr.Next(); err != nil {
690			t.Fatalf("unexpected error(0): %s", err)
691		} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(0, 0).UnixNano(), Value: 1}) {
692			t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
693		}
694		if p, err := fitr.Next(); err != nil {
695			t.Fatalf("unexpected error(1): %s", err)
696		} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(10, 0).UnixNano(), Value: 2}) {
697			t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
698		}
699		if p, err := fitr.Next(); err != nil {
700			t.Fatalf("unexpected error(2): %s", err)
701		} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverA"), Time: time.Unix(30, 0).UnixNano(), Value: 1}) {
702			t.Fatalf("unexpected point(2): %s", spew.Sdump(p))
703		}
704
705		// Next the host=serverB point.
706		if p, err := fitr.Next(); err != nil {
707			t.Fatalf("unexpected error(3): %s", err)
708		} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverB"), Time: time.Unix(20, 0).UnixNano(), Value: 3}) {
709			t.Fatalf("unexpected point(3): %s", spew.Sdump(p))
710		}
711
712		// And finally the host=serverC point.
713		if p, err := fitr.Next(); err != nil {
714			t.Fatalf("unexpected error(4): %s", err)
715		} else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=serverC"), Time: time.Unix(60, 0).UnixNano(), Value: 3}) {
716			t.Fatalf("unexpected point(4): %s", spew.Sdump(p))
717		}
718
719		// Then an EOF should occur.
720		if p, err := fitr.Next(); err != nil {
721			t.Fatalf("expected eof, got error: %s", err)
722		} else if p != nil {
723			t.Fatalf("expected eof, got: %s", spew.Sdump(p))
724		}
725	}
726
727	for _, index := range tsdb.RegisteredIndexes() {
728		t.Run(index, func(t *testing.T) { test(index) })
729	}
730}
731
732// Ensure the store can backup a shard and another store can restore it.
733func TestStore_BackupRestoreShard(t *testing.T) {
734	test := func(index string) {
735		s0, s1 := MustOpenStore(index), MustOpenStore(index)
736		defer s0.Close()
737		defer s1.Close()
738
739		// Create shard with data.
740		s0.MustCreateShardWithData("db0", "rp0", 100,
741			`cpu value=1 0`,
742			`cpu value=2 10`,
743			`cpu value=3 20`,
744		)
745
746		if err := s0.Reopen(); err != nil {
747			t.Fatal(err)
748		}
749
750		// Backup shard to a buffer.
751		var buf bytes.Buffer
752		if err := s0.BackupShard(100, time.Time{}, &buf); err != nil {
753			t.Fatal(err)
754		}
755
756		// Create the shard on the other store and restore from buffer.
757		if err := s1.CreateShard("db0", "rp0", 100, true); err != nil {
758			t.Fatal(err)
759		}
760		if err := s1.RestoreShard(100, &buf); err != nil {
761			t.Fatal(err)
762		}
763
764		// Read data from
765		m := &influxql.Measurement{Name: "cpu"}
766		itr, err := s0.Shard(100).CreateIterator(context.Background(), m, query.IteratorOptions{
767			Expr:      influxql.MustParseExpr(`value`),
768			Ascending: true,
769			StartTime: influxql.MinTime,
770			EndTime:   influxql.MaxTime,
771		})
772		if err != nil {
773			t.Fatal(err)
774		}
775		defer itr.Close()
776		fitr := itr.(query.FloatIterator)
777
778		// Read values from iterator. The host=serverA points should come first.
779		p, e := fitr.Next()
780		if e != nil {
781			t.Fatal(e)
782		}
783		if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(0, 0).UnixNano(), Value: 1}) {
784			t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
785		}
786		p, e = fitr.Next()
787		if e != nil {
788			t.Fatal(e)
789		}
790		if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(10, 0).UnixNano(), Value: 2}) {
791			t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
792		}
793		p, e = fitr.Next()
794		if e != nil {
795			t.Fatal(e)
796		}
797		if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Time: time.Unix(20, 0).UnixNano(), Value: 3}) {
798			t.Fatalf("unexpected point(2): %s", spew.Sdump(p))
799		}
800	}
801
802	for _, index := range tsdb.RegisteredIndexes() {
803		if index == tsdb.TSI1IndexName {
804			t.Skip("Skipping failing test for tsi1")
805		}
806
807		t.Run(index, func(t *testing.T) {
808			test(index)
809		})
810	}
811}
812func TestStore_Shard_SeriesN(t *testing.T) {
813	t.Parallel()
814
815	test := func(index string) error {
816		s := MustOpenStore(index)
817		defer s.Close()
818
819		// Create shard with data.
820		s.MustCreateShardWithData("db0", "rp0", 1,
821			`cpu value=1 0`,
822			`cpu,host=serverA value=2 10`,
823		)
824
825		// Create 2nd shard w/ same measurements.
826		s.MustCreateShardWithData("db0", "rp0", 2,
827			`cpu value=1 0`,
828			`cpu value=2 10`,
829		)
830
831		if got, exp := s.Shard(1).SeriesN(), int64(2); got != exp {
832			return fmt.Errorf("[shard %d] got series count of %d, but expected %d", 1, got, exp)
833		} else if got, exp := s.Shard(2).SeriesN(), int64(1); got != exp {
834			return fmt.Errorf("[shard %d] got series count of %d, but expected %d", 2, got, exp)
835		}
836		return nil
837	}
838
839	for _, index := range tsdb.RegisteredIndexes() {
840		t.Run(index, func(t *testing.T) {
841			if err := test(index); err != nil {
842				t.Error(err)
843			}
844		})
845	}
846}
847
848func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
849	t.Parallel()
850
851	test := func(index string) {
852		s := MustOpenStore(index)
853		defer s.Close()
854
855		// Create shard with data.
856		s.MustCreateShardWithData("db0", "rp0", 1,
857			`cpu value=1 0`,
858			`cpu value=2 10`,
859			`cpu value=3 20`,
860		)
861
862		// Create 2nd shard w/ same measurements.
863		s.MustCreateShardWithData("db0", "rp0", 2,
864			`cpu value=1 0`,
865			`cpu value=2 10`,
866			`cpu value=3 20`,
867		)
868
869		meas, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil)
870		if err != nil {
871			t.Fatalf("unexpected error with MeasurementNames: %v", err)
872		}
873
874		if exp, got := 1, len(meas); exp != got {
875			t.Fatalf("measurement len mismatch: exp %v, got %v", exp, got)
876		}
877
878		if exp, got := "cpu", string(meas[0]); exp != got {
879			t.Fatalf("measurement name mismatch: exp %v, got %v", exp, got)
880		}
881	}
882
883	for _, index := range tsdb.RegisteredIndexes() {
884		t.Run(index, func(t *testing.T) { test(index) })
885	}
886}
887
888func testStoreCardinalityTombstoning(t *testing.T, store *Store) {
889	// Generate point data to write to the shards.
890	series := genTestSeries(10, 2, 4) // 160 series
891
892	points := make([]models.Point, 0, len(series))
893	for _, s := range series {
894		points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
895	}
896
897	// Create requested number of shards in the store & write points across
898	// shards such that we never write the same series to multiple shards.
899	for shardID := 0; shardID < 4; shardID++ {
900		if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
901			t.Errorf("create shard: %s", err)
902		}
903
904		if err := store.BatchWrite(shardID, points[shardID*40:(shardID+1)*40]); err != nil {
905			t.Errorf("batch write: %s", err)
906		}
907	}
908
909	// Delete all the series for each measurement.
910	mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil)
911	if err != nil {
912		t.Fatal(err)
913	}
914
915	for _, name := range mnames {
916		if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil); err != nil {
917			t.Fatal(err)
918		}
919	}
920
921	// Estimate the series cardinality...
922	cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
923	if err != nil {
924		t.Fatal(err)
925	}
926
927	// Estimated cardinality should be well within 10 of the actual cardinality.
928	if got, exp := int(cardinality), 10; got > exp {
929		t.Errorf("series cardinality was %v (expected within %v), expected was: %d", got, exp, 0)
930	}
931
932	// Since all the series have been deleted, all the measurements should have
933	// been removed from the index too.
934	if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
935		t.Fatal(err)
936	}
937
938	// Estimated cardinality should be well within 2 of the actual cardinality.
939	// TODO(edd): this is totally arbitrary. How can I make it better?
940	if got, exp := int(cardinality), 2; got > exp {
941		t.Errorf("measurement cardinality was %v (expected within %v), expected was: %d", got, exp, 0)
942	}
943}
944
945func TestStore_Cardinality_Tombstoning(t *testing.T) {
946	t.Parallel()
947
948	if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" {
949		t.Skip("Skipping test in short, race and appveyor mode.")
950	}
951
952	test := func(index string) {
953		store := NewStore(index)
954		if err := store.Open(); err != nil {
955			panic(err)
956		}
957		defer store.Close()
958		testStoreCardinalityTombstoning(t, store)
959	}
960
961	for _, index := range tsdb.RegisteredIndexes() {
962		t.Run(index, func(t *testing.T) { test(index) })
963	}
964}
965
966func testStoreCardinalityUnique(t *testing.T, store *Store) {
967	// Generate point data to write to the shards.
968	series := genTestSeries(64, 5, 5) // 200,000 series
969	expCardinality := len(series)
970
971	points := make([]models.Point, 0, len(series))
972	for _, s := range series {
973		points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
974	}
975
976	// Create requested number of shards in the store & write points across
977	// shards such that we never write the same series to multiple shards.
978	for shardID := 0; shardID < 10; shardID++ {
979		if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
980			t.Fatalf("create shard: %s", err)
981		}
982		if err := store.BatchWrite(shardID, points[shardID*20000:(shardID+1)*20000]); err != nil {
983			t.Fatalf("batch write: %s", err)
984		}
985	}
986
987	// Estimate the series cardinality...
988	cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
989	if err != nil {
990		t.Fatal(err)
991	}
992
993	// Estimated cardinality should be well within 1.5% of the actual cardinality.
994	if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp {
995		t.Errorf("got epsilon of %v for series cardinality %v (expected %v), which is larger than expected %v", got, cardinality, expCardinality, exp)
996	}
997
998	// Estimate the measurement cardinality...
999	if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
1000		t.Fatal(err)
1001	}
1002
1003	// Estimated cardinality should be well within 2 of the actual cardinality. (arbitrary...)
1004	expCardinality = 64
1005	if got, exp := math.Abs(float64(cardinality)-float64(expCardinality)), 2.0; got > exp {
1006		t.Errorf("got measurmement cardinality %v, expected upto %v; difference is larger than expected %v", cardinality, expCardinality, exp)
1007	}
1008}
1009
1010func TestStore_Cardinality_Unique(t *testing.T) {
1011	t.Parallel()
1012
1013	if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" {
1014		t.Skip("Skipping test in short, race and appveyor mode.")
1015	}
1016
1017	test := func(index string) {
1018		store := NewStore(index)
1019		store.EngineOptions.Config.MaxSeriesPerDatabase = 0
1020		if err := store.Open(); err != nil {
1021			panic(err)
1022		}
1023		defer store.Close()
1024		testStoreCardinalityUnique(t, store)
1025	}
1026
1027	for _, index := range tsdb.RegisteredIndexes() {
1028		t.Run(index, func(t *testing.T) { test(index) })
1029	}
1030}
1031
1032// This test tests cardinality estimation when series data is duplicated across
1033// multiple shards.
1034func testStoreCardinalityDuplicates(t *testing.T, store *Store) {
1035	// Generate point data to write to the shards.
1036	series := genTestSeries(64, 5, 5) // 200,000 series.
1037	expCardinality := len(series)
1038
1039	points := make([]models.Point, 0, len(series))
1040	for _, s := range series {
1041		points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
1042	}
1043
1044	// Create requested number of shards in the store & write points.
1045	for shardID := 0; shardID < 10; shardID++ {
1046		if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
1047			t.Fatalf("create shard: %s", err)
1048		}
1049
1050		var from, to int
1051		if shardID == 0 {
1052			// if it's the first shard then write all of the points.
1053			from, to = 0, len(points)-1
1054		} else {
1055			// For other shards we write a random sub-section of all the points.
1056			// which will duplicate the series and shouldn't increase the
1057			// cardinality.
1058			from, to = rand.Intn(len(points)), rand.Intn(len(points))
1059			if from > to {
1060				from, to = to, from
1061			}
1062		}
1063
1064		if err := store.BatchWrite(shardID, points[from:to]); err != nil {
1065			t.Fatalf("batch write: %s", err)
1066		}
1067	}
1068
1069	// Estimate the series cardinality...
1070	cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
1071	if err != nil {
1072		t.Fatal(err)
1073	}
1074
1075	// Estimated cardinality should be well within 1.5% of the actual cardinality.
1076	if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp {
1077		t.Errorf("got epsilon of %v for series cardinality %d (expected %d), which is larger than expected %v", got, cardinality, expCardinality, exp)
1078	}
1079
1080	// Estimate the measurement cardinality...
1081	if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
1082		t.Fatal(err)
1083	}
1084
1085	// Estimated cardinality should be well within 2 of the actual cardinality. (Arbitrary...)
1086	expCardinality = 64
1087	if got, exp := math.Abs(float64(cardinality)-float64(expCardinality)), 2.0; got > exp {
1088		t.Errorf("got measurement cardinality %v, expected upto %v; difference is larger than expected %v", cardinality, expCardinality, exp)
1089	}
1090}
1091
1092func TestStore_Cardinality_Duplicates(t *testing.T) {
1093	t.Parallel()
1094
1095	if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" {
1096		t.Skip("Skipping test in short, race and appveyor mode.")
1097	}
1098
1099	test := func(index string) {
1100		store := NewStore(index)
1101		store.EngineOptions.Config.MaxSeriesPerDatabase = 0
1102		if err := store.Open(); err != nil {
1103			panic(err)
1104		}
1105		defer store.Close()
1106		testStoreCardinalityDuplicates(t, store)
1107	}
1108
1109	for _, index := range tsdb.RegisteredIndexes() {
1110		t.Run(index, func(t *testing.T) { test(index) })
1111	}
1112}
1113
1114func TestStore_MetaQuery_Timeout(t *testing.T) {
1115	if testing.Short() || os.Getenv("APPVEYOR") != "" {
1116		t.Skip("Skipping test in short and appveyor mode.")
1117	}
1118
1119	test := func(index string) {
1120		store := NewStore(index)
1121		store.EngineOptions.Config.MaxSeriesPerDatabase = 0
1122		if err := store.Open(); err != nil {
1123			panic(err)
1124		}
1125		defer store.Close()
1126		testStoreMetaQueryTimeout(t, store, index)
1127	}
1128
1129	for _, index := range tsdb.RegisteredIndexes() {
1130		test(index)
1131	}
1132}
1133
1134func testStoreMetaQueryTimeout(t *testing.T, store *Store, index string) {
1135	shards := testStoreMetaQuerySetup(t, store)
1136
1137	testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
1138		const funcName = "SeriesCardinality"
1139		_, err := store.Store.SeriesCardinality(ctx, "db")
1140		return funcName, err
1141	}, index)(t)
1142
1143	testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
1144		const funcName = "MeasurementsCardinality"
1145		_, err := store.Store.MeasurementsCardinality(ctx, "db")
1146		return funcName, err
1147	}, index)(t)
1148
1149	keyCondition, allCondition := testStoreMetaQueryCondition()
1150
1151	testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
1152		const funcName = "TagValues"
1153		_, err := store.Store.TagValues(ctx, nil, shards, allCondition)
1154		return funcName, err
1155	}, index)(t)
1156
1157	testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
1158		const funcName = "TagKeys"
1159		_, err := store.Store.TagKeys(ctx, nil, shards, keyCondition)
1160		return funcName, err
1161	}, index)(t)
1162
1163	testStoreMakeTimedFuncs(func(ctx context.Context) (string, error) {
1164		const funcName = "MeasurementNames"
1165		_, err := store.Store.MeasurementNames(ctx, nil, "db", nil)
1166		return funcName, err
1167	}, index)(t)
1168}
1169
1170func testStoreMetaQueryCondition() (influxql.Expr, influxql.Expr) {
1171	keyCondition := &influxql.ParenExpr{
1172		Expr: &influxql.BinaryExpr{
1173			Op: influxql.OR,
1174			LHS: &influxql.BinaryExpr{
1175				Op:  influxql.EQ,
1176				LHS: &influxql.VarRef{Val: "_tagKey"},
1177				RHS: &influxql.StringLiteral{Val: "tagKey4"},
1178			},
1179			RHS: &influxql.BinaryExpr{
1180				Op:  influxql.EQ,
1181				LHS: &influxql.VarRef{Val: "_tagKey"},
1182				RHS: &influxql.StringLiteral{Val: "tagKey5"},
1183			},
1184		},
1185	}
1186
1187	whereCondition := &influxql.ParenExpr{
1188		Expr: &influxql.BinaryExpr{
1189			Op: influxql.AND,
1190			LHS: &influxql.ParenExpr{
1191				Expr: &influxql.BinaryExpr{
1192					Op:  influxql.EQ,
1193					LHS: &influxql.VarRef{Val: "tagKey1"},
1194					RHS: &influxql.StringLiteral{Val: "tagValue2"},
1195				},
1196			},
1197			RHS: keyCondition,
1198		},
1199	}
1200
1201	allCondition := &influxql.BinaryExpr{
1202		Op: influxql.AND,
1203		LHS: &influxql.ParenExpr{
1204			Expr: &influxql.BinaryExpr{
1205				Op:  influxql.EQREGEX,
1206				LHS: &influxql.VarRef{Val: "tagKey3"},
1207				RHS: &influxql.RegexLiteral{Val: regexp.MustCompile(`tagValue\d`)},
1208			},
1209		},
1210		RHS: whereCondition,
1211	}
1212	return keyCondition, allCondition
1213}
1214
1215func testStoreMetaQuerySetup(t *testing.T, store *Store) []uint64 {
1216	const measurementCnt = 64
1217	const tagCnt = 5
1218	const valueCnt = 5
1219	const pointsPerShard = 20000
1220
1221	// Generate point data to write to the shards.
1222	series := genTestSeries(measurementCnt, tagCnt, valueCnt)
1223
1224	points := make([]models.Point, 0, len(series))
1225	for _, s := range series {
1226		points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
1227	}
1228	// Create requested number of shards in the store & write points across
1229	// shards such that we never write the same series to multiple shards.
1230	shards := make([]uint64, len(points)/pointsPerShard)
1231	for shardID := 0; shardID < len(points)/pointsPerShard; shardID++ {
1232		if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
1233			t.Fatalf("create shard: %s", err)
1234		}
1235		if err := store.BatchWrite(shardID, points[shardID*pointsPerShard:(shardID+1)*pointsPerShard]); err != nil {
1236			t.Fatalf("batch write: %s", err)
1237		}
1238		shards[shardID] = uint64(shardID)
1239	}
1240	return shards
1241}
1242
1243func testStoreMakeTimedFuncs(tested func(context.Context) (string, error), index string) func(*testing.T) {
1244	cancelTested := func(t *testing.T) {
1245		ctx, cancel := context.WithTimeout(context.Background(), time.Duration(0))
1246		defer cancel()
1247
1248		funcName, err := tested(ctx)
1249		if err == nil {
1250			t.Fatalf("%v: failed to time out with index type %v", funcName, index)
1251		} else if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
1252			t.Fatalf("%v: failed with %v instead of %v with index type %v", funcName, err, context.DeadlineExceeded, index)
1253		}
1254	}
1255	return cancelTested
1256}
1257
1258// Creates a large number of series in multiple shards, which will force
1259// compactions to occur.
1260func testStoreCardinalityCompactions(store *Store) error {
1261
1262	// Generate point data to write to the shards.
1263	series := genTestSeries(300, 5, 5) // 937,500 series
1264	expCardinality := len(series)
1265
1266	points := make([]models.Point, 0, len(series))
1267	for _, s := range series {
1268		points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
1269	}
1270
1271	// Create requested number of shards in the store & write points across
1272	// shards such that we never write the same series to multiple shards.
1273	for shardID := 0; shardID < 2; shardID++ {
1274		if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
1275			return fmt.Errorf("create shard: %s", err)
1276		}
1277		if err := store.BatchWrite(shardID, points[shardID*468750:(shardID+1)*468750]); err != nil {
1278			return fmt.Errorf("batch write: %s", err)
1279		}
1280	}
1281
1282	// Estimate the series cardinality...
1283	cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
1284	if err != nil {
1285		return err
1286	}
1287
1288	// Estimated cardinality should be well within 1.5% of the actual cardinality.
1289	if got, exp := math.Abs(float64(cardinality)-float64(expCardinality))/float64(expCardinality), 0.015; got > exp {
1290		return fmt.Errorf("got epsilon of %v for series cardinality %v (expected %v), which is larger than expected %v", got, cardinality, expCardinality, exp)
1291	}
1292
1293	// Estimate the measurement cardinality...
1294	if cardinality, err = store.Store.MeasurementsCardinality(context.Background(), "db"); err != nil {
1295		return err
1296	}
1297
1298	// Estimated cardinality should be well within 2 of the actual cardinality. (Arbitrary...)
1299	expCardinality = 300
1300	if got, exp := math.Abs(float64(cardinality)-float64(expCardinality)), 2.0; got > exp {
1301		return fmt.Errorf("got measurement cardinality %v, expected upto %v; difference is larger than expected %v", cardinality, expCardinality, exp)
1302	}
1303	return nil
1304}
1305
1306func TestStore_Cardinality_Compactions(t *testing.T) {
1307	if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" {
1308		t.Skip("Skipping test in short, race and appveyor mode.")
1309	}
1310
1311	test := func(index string) error {
1312		store := NewStore(index)
1313		store.EngineOptions.Config.MaxSeriesPerDatabase = 0
1314		if err := store.Open(); err != nil {
1315			panic(err)
1316		}
1317		defer store.Close()
1318		return testStoreCardinalityCompactions(store)
1319	}
1320
1321	for _, index := range tsdb.RegisteredIndexes() {
1322		t.Run(index, func(t *testing.T) {
1323			if err := test(index); err != nil {
1324				t.Fatal(err)
1325			}
1326		})
1327	}
1328}
1329
1330func TestStore_Cardinality_Limit_On_InMem_Index(t *testing.T) {
1331	t.Parallel()
1332
1333	if testing.Short() || os.Getenv("GORACE") != "" || os.Getenv("APPVEYOR") != "" {
1334		t.Skip("Skipping test in short, race and appveyor mode.")
1335	}
1336
1337	store := NewStore("inmem")
1338	store.EngineOptions.Config.MaxSeriesPerDatabase = 100000
1339	if err := store.Open(); err != nil {
1340		panic(err)
1341	}
1342	defer store.Close()
1343
1344	// Generate 200,000 series to write.
1345	series := genTestSeries(64, 5, 5)
1346
1347	// Add 1 point to each series.
1348	points := make([]models.Point, 0, len(series))
1349	for _, s := range series {
1350		points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
1351	}
1352
1353	// Create shards to write points into.
1354	numShards := 10
1355	for shardID := 0; shardID < numShards; shardID++ {
1356		if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
1357			t.Fatalf("create shard: %s", err)
1358		}
1359	}
1360
1361	// Write series / points to the shards.
1362	pointsPerShard := len(points) / numShards
1363
1364	for shardID := 0; shardID < numShards; shardID++ {
1365		from := shardID * pointsPerShard
1366		to := from + pointsPerShard
1367
1368		if err := store.Store.WriteToShard(tsdb.WriteContext{}, uint64(shardID), points[from:to]); err != nil {
1369			if !strings.Contains(err.Error(), "partial write: max-series-per-database limit exceeded:") {
1370				t.Fatal(err)
1371			}
1372		}
1373	}
1374
1375	// Get updated series cardinality from store after writing data.
1376	cardinality, err := store.Store.SeriesCardinality(context.Background(), "db")
1377	if err != nil {
1378		t.Fatal(err)
1379	}
1380	expCardinality := store.EngineOptions.Config.MaxSeriesPerDatabase
1381
1382	// Estimated cardinality should be well within 1.5% of the actual cardinality.
1383	got := math.Abs(float64(cardinality)-float64(expCardinality)) / float64(expCardinality)
1384	exp := 0.015
1385	if got > exp {
1386		t.Errorf("got epsilon of %v for series cardinality %d (expected %d), which is larger than expected %v", got, cardinality, expCardinality, exp)
1387	}
1388}
1389
1390func TestStore_Sketches(t *testing.T) {
1391	t.Parallel()
1392
1393	checkCardinalities := func(store *tsdb.Store, series, tseries, measurements, tmeasurements int) error {
1394		// Get sketches and check cardinality...
1395		sketch, tsketch, err := store.SeriesSketches(context.Background(), "db")
1396		if err != nil {
1397			return err
1398		}
1399
1400		// delta calculates a rough 10% delta. If i is small then a minimum value
1401		// of 2 is used.
1402		delta := func(i int) int {
1403			v := i / 10
1404			if v == 0 {
1405				v = 2
1406			}
1407			return v
1408		}
1409
1410		// series cardinality should be well within 10%.
1411		if got, exp := int(sketch.Count()), series; got-exp < -delta(series) || got-exp > delta(series) {
1412			return fmt.Errorf("got series cardinality %d, expected ~%d", got, exp)
1413		}
1414
1415		// check series tombstones
1416		if got, exp := int(tsketch.Count()), tseries; got-exp < -delta(tseries) || got-exp > delta(tseries) {
1417			return fmt.Errorf("got series tombstone cardinality %d, expected ~%d", got, exp)
1418		}
1419
1420		// Check measurement cardinality.
1421		if sketch, tsketch, err = store.MeasurementsSketches(context.Background(), "db"); err != nil {
1422			return err
1423		}
1424
1425		if got, exp := int(sketch.Count()), measurements; got-exp < -delta(measurements) || got-exp > delta(measurements) {
1426			return fmt.Errorf("got measurement cardinality %d, expected ~%d", got, exp)
1427		}
1428
1429		if got, exp := int(tsketch.Count()), tmeasurements; got-exp < -delta(tmeasurements) || got-exp > delta(tmeasurements) {
1430			return fmt.Errorf("got measurement tombstone cardinality %d, expected ~%d", got, exp)
1431		}
1432		return nil
1433	}
1434
1435	test := func(index string) error {
1436		store := MustOpenStore(index)
1437		defer store.Close()
1438
1439		// Generate point data to write to the shards.
1440		series := genTestSeries(10, 2, 4) // 160 series
1441
1442		points := make([]models.Point, 0, len(series))
1443		for _, s := range series {
1444			points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
1445		}
1446
1447		// Create requested number of shards in the store & write points across
1448		// shards such that we never write the same series to multiple shards.
1449		for shardID := 0; shardID < 4; shardID++ {
1450			if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
1451				return fmt.Errorf("create shard: %s", err)
1452			}
1453
1454			if err := store.BatchWrite(shardID, points[shardID*40:(shardID+1)*40]); err != nil {
1455				return fmt.Errorf("batch write: %s", err)
1456			}
1457		}
1458
1459		// Check cardinalities
1460		if err := checkCardinalities(store.Store, 160, 0, 10, 0); err != nil {
1461			return fmt.Errorf("[initial] %v", err)
1462		}
1463
1464		// Reopen the store.
1465		if err := store.Reopen(); err != nil {
1466			return err
1467		}
1468
1469		// Check cardinalities
1470		if err := checkCardinalities(store.Store, 160, 0, 10, 0); err != nil {
1471			return fmt.Errorf("[initial|re-open] %v", err)
1472		}
1473
1474		// Delete half the the measurements data
1475		mnames, err := store.MeasurementNames(context.Background(), nil, "db", nil)
1476		if err != nil {
1477			return err
1478		}
1479
1480		for _, name := range mnames[:len(mnames)/2] {
1481			if err := store.DeleteSeries("db", []influxql.Source{&influxql.Measurement{Name: string(name)}}, nil); err != nil {
1482				return err
1483			}
1484		}
1485
1486		// Check cardinalities. In this case, the indexes behave differently.
1487		expS, expTS, expM, expTM := 160, 80, 10, 5
1488		if index == inmem.IndexName {
1489			expS, expTS, expM, expTM = 160, 80, 10, 5
1490		}
1491
1492		// Check cardinalities - tombstones should be in
1493		if err := checkCardinalities(store.Store, expS, expTS, expM, expTM); err != nil {
1494			return fmt.Errorf("[initial|re-open|delete] %v", err)
1495		}
1496
1497		// Reopen the store.
1498		if err := store.Reopen(); err != nil {
1499			return err
1500		}
1501
1502		// Check cardinalities. In this case, the indexes behave differently.
1503		expS, expTS, expM, expTM = 80, 80, 5, 5
1504		if index == inmem.IndexName {
1505			expS, expTS, expM, expTM = 80, 0, 5, 0
1506		}
1507
1508		if err := checkCardinalities(store.Store, expS, expTS, expM, expTM); err != nil {
1509			return fmt.Errorf("[initial|re-open|delete|re-open] %v", err)
1510		}
1511		return nil
1512	}
1513
1514	for _, index := range tsdb.RegisteredIndexes() {
1515		t.Run(index, func(t *testing.T) {
1516			if err := test(index); err != nil {
1517				t.Fatal(err)
1518			}
1519		})
1520	}
1521}
1522
1523func TestStore_TagValues(t *testing.T) {
1524	t.Parallel()
1525
1526	// No WHERE - just get for keys host and shard
1527	RHSAll := &influxql.ParenExpr{
1528		Expr: &influxql.BinaryExpr{
1529			Op: influxql.OR,
1530			LHS: &influxql.BinaryExpr{
1531				Op:  influxql.EQ,
1532				LHS: &influxql.VarRef{Val: "_tagKey"},
1533				RHS: &influxql.StringLiteral{Val: "host"},
1534			},
1535			RHS: &influxql.BinaryExpr{
1536				Op:  influxql.EQ,
1537				LHS: &influxql.VarRef{Val: "_tagKey"},
1538				RHS: &influxql.StringLiteral{Val: "shard"},
1539			},
1540		},
1541	}
1542
1543	// Get for host and shard, but also WHERE on foo = a
1544	RHSWhere := &influxql.ParenExpr{
1545		Expr: &influxql.BinaryExpr{
1546			Op: influxql.AND,
1547			LHS: &influxql.ParenExpr{
1548				Expr: &influxql.BinaryExpr{
1549					Op:  influxql.EQ,
1550					LHS: &influxql.VarRef{Val: "foo"},
1551					RHS: &influxql.StringLiteral{Val: "a"},
1552				},
1553			},
1554			RHS: RHSAll,
1555		},
1556	}
1557
1558	// SHOW TAG VALUES FROM /cpu\d/ WITH KEY IN ("host", "shard")
1559	//
1560	// Switching out RHS for RHSWhere would make the query:
1561	//    SHOW TAG VALUES FROM /cpu\d/ WITH KEY IN ("host", "shard") WHERE foo = 'a'
1562	base := influxql.BinaryExpr{
1563		Op: influxql.AND,
1564		LHS: &influxql.ParenExpr{
1565			Expr: &influxql.BinaryExpr{
1566				Op:  influxql.EQREGEX,
1567				LHS: &influxql.VarRef{Val: "_name"},
1568				RHS: &influxql.RegexLiteral{Val: regexp.MustCompile(`cpu\d`)},
1569			},
1570		},
1571		RHS: RHSAll,
1572	}
1573
1574	var baseWhere *influxql.BinaryExpr = influxql.CloneExpr(&base).(*influxql.BinaryExpr)
1575	baseWhere.RHS = RHSWhere
1576
1577	examples := []struct {
1578		Name string
1579		Expr influxql.Expr
1580		Exp  []tsdb.TagValues
1581	}{
1582		{
1583			Name: "No WHERE clause",
1584			Expr: &base,
1585			Exp: []tsdb.TagValues{
1586				createTagValues("cpu0", map[string][]string{"shard": {"s0"}}),
1587				createTagValues("cpu1", map[string][]string{"shard": {"s1"}}),
1588				createTagValues("cpu10", map[string][]string{"host": {"nofoo", "tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}),
1589				createTagValues("cpu11", map[string][]string{"host": {"nofoo", "tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}),
1590				createTagValues("cpu12", map[string][]string{"host": {"nofoo", "tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}),
1591				createTagValues("cpu2", map[string][]string{"shard": {"s2"}}),
1592			},
1593		},
1594		{
1595			Name: "With WHERE clause",
1596			Expr: baseWhere,
1597			Exp: []tsdb.TagValues{
1598				createTagValues("cpu0", map[string][]string{"shard": {"s0"}}),
1599				createTagValues("cpu1", map[string][]string{"shard": {"s1"}}),
1600				createTagValues("cpu10", map[string][]string{"host": {"tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}),
1601				createTagValues("cpu11", map[string][]string{"host": {"tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}),
1602				createTagValues("cpu12", map[string][]string{"host": {"tv0", "tv1", "tv2", "tv3"}, "shard": {"s0", "s1", "s2"}}),
1603				createTagValues("cpu2", map[string][]string{"shard": {"s2"}}),
1604			},
1605		},
1606	}
1607
1608	setup := func(index string) (*Store, []uint64) { // returns shard ids
1609		s := MustOpenStore(index)
1610
1611		fmtStr := `cpu1%[1]d,foo=a,ignoreme=nope,host=tv%[2]d,shard=s%[3]d value=1 %[4]d
1612	cpu1%[1]d,host=nofoo value=1 %[4]d
1613	mem,host=nothanks value=1 %[4]d
1614	cpu%[3]d,shard=s%[3]d,foo=a value=2 %[4]d
1615	`
1616		genPoints := func(sid int) []string {
1617			var ts int
1618			points := make([]string, 0, 3*4)
1619			for m := 0; m < 3; m++ {
1620				for tagvid := 0; tagvid < 4; tagvid++ {
1621					points = append(points, fmt.Sprintf(fmtStr, m, tagvid, sid, ts))
1622					ts++
1623				}
1624			}
1625			return points
1626		}
1627
1628		// Create data across 3 shards.
1629		var ids []uint64
1630		for i := 0; i < 3; i++ {
1631			ids = append(ids, uint64(i))
1632			s.MustCreateShardWithData("db0", "rp0", i, genPoints(i)...)
1633		}
1634		return s, ids
1635	}
1636
1637	for _, example := range examples {
1638		for _, index := range tsdb.RegisteredIndexes() {
1639			s, shardIDs := setup(index)
1640			t.Run(example.Name+"_"+index, func(t *testing.T) {
1641				got, err := s.TagValues(context.Background(), nil, shardIDs, example.Expr)
1642				if err != nil {
1643					t.Fatal(err)
1644				}
1645				exp := example.Exp
1646
1647				if !reflect.DeepEqual(got, exp) {
1648					t.Fatalf("got:\n%#v\n\nexp:\n%#v", got, exp)
1649				}
1650			})
1651			s.Close()
1652		}
1653	}
1654}
1655
1656func TestStore_Measurements_Auth(t *testing.T) {
1657	t.Parallel()
1658
1659	test := func(index string) error {
1660		s := MustOpenStore(index)
1661		defer s.Close()
1662
1663		// Create shard #0 with data.
1664		s.MustCreateShardWithData("db0", "rp0", 0,
1665			`cpu,host=serverA value=1  0`,
1666			`cpu,host=serverA value=2 10`,
1667			`cpu,region=west value=3 20`,
1668			`cpu,secret=foo value=5 30`, // cpu still readable because it has other series that can be read.
1669			`mem,secret=foo value=1 30`,
1670			`disk value=4 30`,
1671		)
1672
1673		authorizer := &internal.AuthorizerMock{
1674			AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool {
1675				if database == "" || tags.GetString("secret") != "" {
1676					t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags)
1677					return false
1678				}
1679				return true
1680			},
1681		}
1682
1683		names, err := s.MeasurementNames(context.Background(), authorizer, "db0", nil)
1684		if err != nil {
1685			return err
1686		}
1687
1688		// names should not contain any measurements where none of the associated
1689		// series are authorised for reads.
1690		expNames := 2
1691		var gotNames int
1692		for _, name := range names {
1693			if string(name) == "mem" {
1694				return fmt.Errorf("got measurement %q but it should be filtered.", name)
1695			}
1696			gotNames++
1697		}
1698
1699		if gotNames != expNames {
1700			return fmt.Errorf("got %d measurements, but expected %d", gotNames, expNames)
1701		}
1702
1703		// Now delete all of the cpu series.
1704		cond, err := influxql.ParseExpr("host = 'serverA' OR region = 'west'")
1705		if err != nil {
1706			return err
1707		}
1708
1709		if err := s.DeleteSeries("db0", nil, cond); err != nil {
1710			return err
1711		}
1712
1713		if names, err = s.MeasurementNames(context.Background(), authorizer, "db0", nil); err != nil {
1714			return err
1715		}
1716
1717		// names should not contain any measurements where none of the associated
1718		// series are authorised for reads.
1719		expNames = 1
1720		gotNames = 0
1721		for _, name := range names {
1722			if string(name) == "mem" || string(name) == "cpu" {
1723				return fmt.Errorf("after delete got measurement %q but it should be filtered.", name)
1724			}
1725			gotNames++
1726		}
1727
1728		if gotNames != expNames {
1729			return fmt.Errorf("after delete got %d measurements, but expected %d", gotNames, expNames)
1730		}
1731
1732		return nil
1733	}
1734
1735	for _, index := range tsdb.RegisteredIndexes() {
1736		t.Run(index, func(t *testing.T) {
1737			if err := test(index); err != nil {
1738				t.Fatal(err)
1739			}
1740		})
1741	}
1742
1743}
1744
1745func TestStore_TagKeys_Auth(t *testing.T) {
1746	t.Parallel()
1747
1748	test := func(index string) error {
1749		s := MustOpenStore(index)
1750		defer s.Close()
1751
1752		// Create shard #0 with data.
1753		s.MustCreateShardWithData("db0", "rp0", 0,
1754			`cpu,host=serverA value=1  0`,
1755			`cpu,host=serverA,debug=true value=2 10`,
1756			`cpu,region=west value=3 20`,
1757			`cpu,secret=foo,machine=a value=1 20`,
1758		)
1759
1760		authorizer := &internal.AuthorizerMock{
1761			AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool {
1762				if database == "" || !bytes.Equal(measurement, []byte("cpu")) || tags.GetString("secret") != "" {
1763					t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags)
1764					return false
1765				}
1766				return true
1767			},
1768		}
1769
1770		keys, err := s.TagKeys(context.Background(), authorizer, []uint64{0}, nil)
1771		if err != nil {
1772			return err
1773		}
1774
1775		// keys should not contain any tag keys associated with a series containing
1776		// a secret tag.
1777		expKeys := 3
1778		var gotKeys int
1779		for _, tk := range keys {
1780			if got, exp := tk.Measurement, "cpu"; got != exp {
1781				return fmt.Errorf("got measurement %q, expected %q", got, exp)
1782			}
1783
1784			for _, key := range tk.Keys {
1785				if key == "secret" || key == "machine" {
1786					return fmt.Errorf("got tag key %q but it should be filtered.", key)
1787				}
1788				gotKeys++
1789			}
1790		}
1791
1792		if gotKeys != expKeys {
1793			return fmt.Errorf("got %d keys, but expected %d", gotKeys, expKeys)
1794		}
1795
1796		// Delete the series with region = west
1797		cond, err := influxql.ParseExpr("region = 'west'")
1798		if err != nil {
1799			return err
1800		}
1801		if err := s.DeleteSeries("db0", nil, cond); err != nil {
1802			return err
1803		}
1804
1805		if keys, err = s.TagKeys(context.Background(), authorizer, []uint64{0}, nil); err != nil {
1806			return err
1807		}
1808
1809		// keys should not contain any tag keys associated with a series containing
1810		// a secret tag or the deleted series
1811		expKeys = 2
1812		gotKeys = 0
1813		for _, tk := range keys {
1814			if got, exp := tk.Measurement, "cpu"; got != exp {
1815				return fmt.Errorf("got measurement %q, expected %q", got, exp)
1816			}
1817
1818			for _, key := range tk.Keys {
1819				if key == "secret" || key == "machine" || key == "region" {
1820					return fmt.Errorf("got tag key %q but it should be filtered.", key)
1821				}
1822				gotKeys++
1823			}
1824		}
1825
1826		if gotKeys != expKeys {
1827			return fmt.Errorf("got %d keys, but expected %d", gotKeys, expKeys)
1828		}
1829
1830		return nil
1831	}
1832
1833	for _, index := range tsdb.RegisteredIndexes() {
1834		t.Run(index, func(t *testing.T) {
1835			if err := test(index); err != nil {
1836				t.Fatal(err)
1837			}
1838		})
1839	}
1840
1841}
1842
1843func TestStore_TagValues_Auth(t *testing.T) {
1844	t.Parallel()
1845
1846	test := func(index string) error {
1847		s := MustOpenStore(index)
1848		defer s.Close()
1849
1850		// Create shard #0 with data.
1851		s.MustCreateShardWithData("db0", "rp0", 0,
1852			`cpu,host=serverA value=1  0`,
1853			`cpu,host=serverA value=2 10`,
1854			`cpu,host=serverB value=3 20`,
1855			`cpu,secret=foo,host=serverD value=1 20`,
1856		)
1857
1858		authorizer := &internal.AuthorizerMock{
1859			AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool {
1860				if database == "" || !bytes.Equal(measurement, []byte("cpu")) || tags.GetString("secret") != "" {
1861					t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags)
1862					return false
1863				}
1864				return true
1865			},
1866		}
1867
1868		values, err := s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{
1869			Op:  influxql.EQ,
1870			LHS: &influxql.VarRef{Val: "_tagKey"},
1871			RHS: &influxql.StringLiteral{Val: "host"},
1872		})
1873
1874		if err != nil {
1875			return err
1876		}
1877
1878		// values should not contain any tag values associated with a series containing
1879		// a secret tag.
1880		expValues := 2
1881		var gotValues int
1882		for _, tv := range values {
1883			if got, exp := tv.Measurement, "cpu"; got != exp {
1884				return fmt.Errorf("got measurement %q, expected %q", got, exp)
1885			}
1886
1887			for _, v := range tv.Values {
1888				if got, exp := v.Value, "serverD"; got == exp {
1889					return fmt.Errorf("got tag value %q but it should be filtered.", got)
1890				}
1891				gotValues++
1892			}
1893		}
1894
1895		if gotValues != expValues {
1896			return fmt.Errorf("got %d tags, but expected %d", gotValues, expValues)
1897		}
1898
1899		// Delete the series with values serverA
1900		cond, err := influxql.ParseExpr("host = 'serverA'")
1901		if err != nil {
1902			return err
1903		}
1904		if err := s.DeleteSeries("db0", nil, cond); err != nil {
1905			return err
1906		}
1907
1908		values, err = s.TagValues(context.Background(), authorizer, []uint64{0}, &influxql.BinaryExpr{
1909			Op:  influxql.EQ,
1910			LHS: &influxql.VarRef{Val: "_tagKey"},
1911			RHS: &influxql.StringLiteral{Val: "host"},
1912		})
1913
1914		if err != nil {
1915			return err
1916		}
1917
1918		// values should not contain any tag values associated with a series containing
1919		// a secret tag.
1920		expValues = 1
1921		gotValues = 0
1922		for _, tv := range values {
1923			if got, exp := tv.Measurement, "cpu"; got != exp {
1924				return fmt.Errorf("got measurement %q, expected %q", got, exp)
1925			}
1926
1927			for _, v := range tv.Values {
1928				if got, exp := v.Value, "serverD"; got == exp {
1929					return fmt.Errorf("got tag value %q but it should be filtered.", got)
1930				} else if got, exp := v.Value, "serverA"; got == exp {
1931					return fmt.Errorf("got tag value %q but it should be filtered.", got)
1932				}
1933				gotValues++
1934			}
1935		}
1936
1937		if gotValues != expValues {
1938			return fmt.Errorf("got %d values, but expected %d", gotValues, expValues)
1939		}
1940		return nil
1941	}
1942
1943	for _, index := range tsdb.RegisteredIndexes() {
1944		t.Run(index, func(t *testing.T) {
1945			if err := test(index); err != nil {
1946				t.Fatal(err)
1947			}
1948		})
1949	}
1950}
1951
1952// Helper to create some tag values
1953func createTagValues(mname string, kvs map[string][]string) tsdb.TagValues {
1954	var sz int
1955	for _, v := range kvs {
1956		sz += len(v)
1957	}
1958
1959	out := tsdb.TagValues{
1960		Measurement: mname,
1961		Values:      make([]tsdb.KeyValue, 0, sz),
1962	}
1963
1964	for tk, tvs := range kvs {
1965		for _, tv := range tvs {
1966			out.Values = append(out.Values, tsdb.KeyValue{Key: tk, Value: tv})
1967		}
1968		// We have to sort the KeyValues since that's how they're provided from
1969		// the tsdb.Store.
1970		sort.Sort(tsdb.KeyValues(out.Values))
1971	}
1972
1973	return out
1974}
1975
1976func TestStore_MeasurementNames_ConcurrentDropShard(t *testing.T) {
1977	for _, index := range tsdb.RegisteredIndexes() {
1978		s := MustOpenStore(index)
1979		defer s.Close()
1980
1981		shardN := 10
1982		for i := 0; i < shardN; i++ {
1983			// Create new shards with some data
1984			s.MustCreateShardWithData("db0", "rp0", i,
1985				`cpu,host=serverA value=1 30`,
1986				`mem,region=west value=2 40`, // skip: wrong source
1987				`cpu,host=serverC value=3 60`,
1988			)
1989		}
1990
1991		done := make(chan struct{})
1992		errC := make(chan error, 2)
1993
1994		// Randomly close and open the shards.
1995		go func() {
1996			for {
1997				select {
1998				case <-done:
1999					errC <- nil
2000					return
2001				default:
2002					i := uint64(rand.Intn(int(shardN)))
2003					if sh := s.Shard(i); sh == nil {
2004						errC <- errors.New("shard should not be nil")
2005						return
2006					} else {
2007						if err := sh.Close(); err != nil {
2008							errC <- err
2009							return
2010						}
2011						time.Sleep(500 * time.Microsecond)
2012						if err := sh.Open(); err != nil {
2013							errC <- err
2014							return
2015						}
2016					}
2017				}
2018			}
2019		}()
2020
2021		// Attempt to get tag keys from the shards.
2022		go func() {
2023			for {
2024				select {
2025				case <-done:
2026					errC <- nil
2027					return
2028				default:
2029					names, err := s.MeasurementNames(context.Background(), nil, "db0", nil)
2030					if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
2031						continue // These errors are expected
2032					}
2033
2034					if err != nil {
2035						errC <- err
2036						return
2037					}
2038
2039					if got, exp := names, slices.StringsToBytes("cpu", "mem"); !reflect.DeepEqual(got, exp) {
2040						errC <- fmt.Errorf("got keys %v, expected %v", got, exp)
2041						return
2042					}
2043				}
2044			}
2045		}()
2046
2047		// Run for 500ms
2048		time.Sleep(500 * time.Millisecond)
2049		close(done)
2050
2051		// Check for errors.
2052		if err := <-errC; err != nil {
2053			t.Fatal(err)
2054		}
2055		if err := <-errC; err != nil {
2056			t.Fatal(err)
2057		}
2058	}
2059}
2060
2061func TestStore_TagKeys_ConcurrentDropShard(t *testing.T) {
2062	for _, index := range tsdb.RegisteredIndexes() {
2063		s := MustOpenStore(index)
2064		defer s.Close()
2065
2066		shardN := 10
2067		for i := 0; i < shardN; i++ {
2068			// Create new shards with some data
2069			s.MustCreateShardWithData("db0", "rp0", i,
2070				`cpu,host=serverA value=1 30`,
2071				`mem,region=west value=2 40`, // skip: wrong source
2072				`cpu,host=serverC value=3 60`,
2073			)
2074		}
2075
2076		done := make(chan struct{})
2077		errC := make(chan error, 2)
2078
2079		// Randomly close and open the shards.
2080		go func() {
2081			for {
2082				select {
2083				case <-done:
2084					errC <- nil
2085					return
2086				default:
2087					i := uint64(rand.Intn(int(shardN)))
2088					if sh := s.Shard(i); sh == nil {
2089						errC <- errors.New("shard should not be nil")
2090						return
2091					} else {
2092						if err := sh.Close(); err != nil {
2093							errC <- err
2094							return
2095						}
2096						time.Sleep(500 * time.Microsecond)
2097						if err := sh.Open(); err != nil {
2098							errC <- err
2099							return
2100						}
2101					}
2102				}
2103			}
2104		}()
2105
2106		// Attempt to get tag keys from the shards.
2107		go func() {
2108			for {
2109				select {
2110				case <-done:
2111					errC <- nil
2112					return
2113				default:
2114					keys, err := s.TagKeys(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil)
2115					if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
2116						continue // These errors are expected
2117					}
2118
2119					if err != nil {
2120						errC <- err
2121						return
2122					}
2123
2124					if got, exp := keys[0].Keys, []string{"host"}; !reflect.DeepEqual(got, exp) {
2125						errC <- fmt.Errorf("got keys %v, expected %v", got, exp)
2126						return
2127					}
2128
2129					if got, exp := keys[1].Keys, []string{"region"}; !reflect.DeepEqual(got, exp) {
2130						errC <- fmt.Errorf("got keys %v, expected %v", got, exp)
2131						return
2132					}
2133				}
2134			}
2135		}()
2136
2137		// Run for 500ms
2138		time.Sleep(500 * time.Millisecond)
2139
2140		close(done)
2141
2142		// Check for errors
2143		if err := <-errC; err != nil {
2144			t.Fatal(err)
2145		}
2146		if err := <-errC; err != nil {
2147			t.Fatal(err)
2148		}
2149	}
2150}
2151
2152func TestStore_TagValues_ConcurrentDropShard(t *testing.T) {
2153	for _, index := range tsdb.RegisteredIndexes() {
2154		s := MustOpenStore(index)
2155		defer s.Close()
2156
2157		shardN := 10
2158		for i := 0; i < shardN; i++ {
2159			// Create new shards with some data
2160			s.MustCreateShardWithData("db0", "rp0", i,
2161				`cpu,host=serverA value=1 30`,
2162				`mem,region=west value=2 40`, // skip: wrong source
2163				`cpu,host=serverC value=3 60`,
2164			)
2165		}
2166
2167		done := make(chan struct{})
2168		errC := make(chan error, 2)
2169
2170		// Randomly close and open the shards.
2171		go func() {
2172			for {
2173				select {
2174				case <-done:
2175					errC <- nil
2176					return
2177				default:
2178					i := uint64(rand.Intn(int(shardN)))
2179					if sh := s.Shard(i); sh == nil {
2180						errC <- errors.New("shard should not be nil")
2181						return
2182					} else {
2183						if err := sh.Close(); err != nil {
2184							errC <- err
2185							return
2186						}
2187						time.Sleep(500 * time.Microsecond)
2188						if err := sh.Open(); err != nil {
2189							errC <- err
2190							return
2191						}
2192					}
2193				}
2194			}
2195		}()
2196
2197		// Attempt to get tag keys from the shards.
2198		go func() {
2199			for {
2200				select {
2201				case <-done:
2202					errC <- nil
2203					return
2204				default:
2205					stmt, err := influxql.ParseStatement(`SHOW TAG VALUES WITH KEY = "host"`)
2206					if err != nil {
2207						errC <- err
2208					}
2209					rewrite, err := query.RewriteStatement(stmt)
2210					if err != nil {
2211						errC <- err
2212					}
2213
2214					cond := rewrite.(*influxql.ShowTagValuesStatement).Condition
2215					values, err := s.TagValues(context.Background(), nil, []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, cond)
2216					if err == tsdb.ErrIndexClosing || err == tsdb.ErrEngineClosed {
2217						continue // These errors are expected
2218					}
2219
2220					if err != nil {
2221						errC <- err
2222						return
2223					}
2224
2225					exp := tsdb.TagValues{
2226						Measurement: "cpu",
2227						Values: []tsdb.KeyValue{
2228							{Key: "host", Value: "serverA"},
2229							{Key: "host", Value: "serverC"},
2230						},
2231					}
2232
2233					if got := values[0]; !reflect.DeepEqual(got, exp) {
2234						errC <- fmt.Errorf("got keys %v, expected %v", got, exp)
2235						return
2236					}
2237				}
2238			}
2239		}()
2240
2241		// Run for 500ms
2242		time.Sleep(500 * time.Millisecond)
2243
2244		close(done)
2245
2246		// Check for errors
2247		if err := <-errC; err != nil {
2248			t.Fatal(err)
2249		}
2250		if err := <-errC; err != nil {
2251			t.Fatal(err)
2252		}
2253	}
2254}
2255
2256func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) {
2257	for _, index := range tsdb.RegisteredIndexes() {
2258		store := NewStore(index)
2259		if err := store.Open(); err != nil {
2260			panic(err)
2261		}
2262
2263		// Write a point to n shards.
2264		for shardID := 0; shardID < 100; shardID++ {
2265			if err := store.CreateShard("db", "rp", uint64(shardID), true); err != nil {
2266				b.Fatalf("create shard: %s", err)
2267			}
2268
2269			err := store.WriteToShard(tsdb.WriteContext{}, uint64(shardID), []models.Point{models.MustNewPoint("cpu", nil, map[string]interface{}{"value": 1.0}, time.Now())})
2270			if err != nil {
2271				b.Fatalf("write: %s", err)
2272			}
2273		}
2274
2275		b.Run(store.EngineOptions.IndexVersion, func(b *testing.B) {
2276			for i := 0; i < b.N; i++ {
2277				_, _ = store.SeriesCardinality(context.Background(), "db")
2278			}
2279		})
2280		store.Close()
2281	}
2282}
2283
2284func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(b, 64, 5, 5, 1, 100) }
2285
2286func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {
2287	var store *Store
2288	setup := func(index string) error {
2289		store := MustOpenStore(index)
2290
2291		// Generate test series (measurements + unique tag sets).
2292		series := genTestSeries(mCnt, tkCnt, tvCnt)
2293
2294		// Generate point data to write to the shards.
2295		points := []models.Point{}
2296		for _, s := range series {
2297			for val := 0.0; val < float64(pntCnt); val++ {
2298				p := models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": val}, time.Now())
2299				points = append(points, p)
2300			}
2301		}
2302
2303		// Create requested number of shards in the store & write points.
2304		for shardID := 0; shardID < shardCnt; shardID++ {
2305			if err := store.CreateShard("mydb", "myrp", uint64(shardID), true); err != nil {
2306				return fmt.Errorf("create shard: %s", err)
2307			}
2308			if err := store.BatchWrite(shardID, points); err != nil {
2309				return fmt.Errorf("batch write: %s", err)
2310			}
2311		}
2312		return nil
2313	}
2314
2315	for _, index := range tsdb.RegisteredIndexes() {
2316		if err := setup(index); err != nil {
2317			b.Fatal(err)
2318		}
2319		b.Run(store.EngineOptions.IndexVersion, func(b *testing.B) {
2320			for n := 0; n < b.N; n++ {
2321				store := tsdb.NewStore(store.Path())
2322				if err := store.Open(); err != nil {
2323					b.Fatalf("open store error: %s", err)
2324				}
2325
2326				b.StopTimer()
2327				store.Close()
2328				b.StartTimer()
2329			}
2330		})
2331		os.RemoveAll(store.Path())
2332	}
2333}
2334
2335// To store result of benchmark (ensure allocated on heap).
2336var tvResult []tsdb.TagValues
2337
2338func BenchmarkStore_TagValues(b *testing.B) {
2339	benchmarks := []struct {
2340		name         string
2341		shards       int
2342		measurements int
2343		tagValues    int
2344	}{
2345		{name: "s=1_m=1_v=100", shards: 1, measurements: 1, tagValues: 100},
2346		{name: "s=1_m=1_v=1000", shards: 1, measurements: 1, tagValues: 1000},
2347		{name: "s=1_m=10_v=100", shards: 1, measurements: 10, tagValues: 100},
2348		{name: "s=1_m=10_v=1000", shards: 1, measurements: 10, tagValues: 1000},
2349		{name: "s=1_m=100_v=100", shards: 1, measurements: 100, tagValues: 100},
2350		{name: "s=1_m=100_v=1000", shards: 1, measurements: 100, tagValues: 1000},
2351		{name: "s=10_m=1_v=100", shards: 10, measurements: 1, tagValues: 100},
2352		{name: "s=10_m=1_v=1000", shards: 10, measurements: 1, tagValues: 1000},
2353		{name: "s=10_m=10_v=100", shards: 10, measurements: 10, tagValues: 100},
2354		{name: "s=10_m=10_v=1000", shards: 10, measurements: 10, tagValues: 1000},
2355		{name: "s=10_m=100_v=100", shards: 10, measurements: 100, tagValues: 100},
2356		{name: "s=10_m=100_v=1000", shards: 10, measurements: 100, tagValues: 1000},
2357	}
2358
2359	setup := func(shards, measurements, tagValues int, index string, useRandom bool) (*Store, []uint64) { // returns shard ids
2360		s := NewStore(index)
2361		if err := s.Open(); err != nil {
2362			panic(err)
2363		}
2364
2365		fmtStr := `cpu%[1]d,host=tv%[2]d,shard=s%[3]d,z1=s%[1]d%[2]d,z2=%[4]s value=1 %[5]d`
2366		// genPoints generates some point data. If ran is true then random tag
2367		// key values will be generated, meaning more work sorting and merging.
2368		// If ran is false, then the same set of points will be produced for the
2369		// same set of parameters, meaning more de-duplication of points will be
2370		// needed.
2371		genPoints := func(sid int, ran bool) []string {
2372			var v, ts int
2373			var half string
2374			points := make([]string, 0, measurements*tagValues)
2375			for m := 0; m < measurements; m++ {
2376				for tagvid := 0; tagvid < tagValues; tagvid++ {
2377					v = tagvid
2378					if ran {
2379						v = rand.Intn(100000)
2380					}
2381					half = fmt.Sprint(rand.Intn(2) == 0)
2382					points = append(points, fmt.Sprintf(fmtStr, m, v, sid, half, ts))
2383					ts++
2384				}
2385			}
2386			return points
2387		}
2388
2389		// Create data across chosen number of shards.
2390		var shardIDs []uint64
2391		for i := 0; i < shards; i++ {
2392			shardIDs = append(shardIDs, uint64(i))
2393			s.MustCreateShardWithData("db0", "rp0", i, genPoints(i, useRandom)...)
2394		}
2395		return s, shardIDs
2396	}
2397
2398	// SHOW TAG VALUES WITH KEY IN ("host", "shard")
2399	cond1 := &influxql.ParenExpr{
2400		Expr: &influxql.BinaryExpr{
2401			Op: influxql.OR,
2402			LHS: &influxql.BinaryExpr{
2403				Op:  influxql.EQ,
2404				LHS: &influxql.VarRef{Val: "_tagKey"},
2405				RHS: &influxql.StringLiteral{Val: "host"},
2406			},
2407			RHS: &influxql.BinaryExpr{
2408				Op:  influxql.EQ,
2409				LHS: &influxql.VarRef{Val: "_tagKey"},
2410				RHS: &influxql.StringLiteral{Val: "shard"},
2411			},
2412		},
2413	}
2414
2415	cond2 := &influxql.ParenExpr{
2416		Expr: &influxql.BinaryExpr{
2417			Op: influxql.AND,
2418			LHS: &influxql.ParenExpr{
2419				Expr: &influxql.BinaryExpr{
2420					Op:  influxql.EQ,
2421					LHS: &influxql.VarRef{Val: "z2"},
2422					RHS: &influxql.StringLiteral{Val: "true"},
2423				},
2424			},
2425			RHS: cond1,
2426		},
2427	}
2428
2429	var err error
2430	for _, index := range tsdb.RegisteredIndexes() {
2431		for useRand := 0; useRand < 2; useRand++ {
2432			for c, condition := range []influxql.Expr{cond1, cond2} {
2433				for _, bm := range benchmarks {
2434					s, shardIDs := setup(bm.shards, bm.measurements, bm.tagValues, index, useRand == 1)
2435					teardown := func() {
2436						if err := s.Close(); err != nil {
2437							b.Fatal(err)
2438						}
2439					}
2440
2441					cnd := "Unfiltered"
2442					if c == 0 {
2443						cnd = "Filtered"
2444					}
2445					b.Run("random_values="+fmt.Sprint(useRand == 1)+"_index="+index+"_"+cnd+"_"+bm.name, func(b *testing.B) {
2446						for i := 0; i < b.N; i++ {
2447							if tvResult, err = s.TagValues(context.Background(), nil, shardIDs, condition); err != nil {
2448								b.Fatal(err)
2449							}
2450						}
2451					})
2452					teardown()
2453				}
2454			}
2455		}
2456	}
2457}
2458
2459// Store is a test wrapper for tsdb.Store.
2460type Store struct {
2461	*tsdb.Store
2462	index string
2463}
2464
2465// NewStore returns a new instance of Store with a temporary path.
2466func NewStore(index string) *Store {
2467	path, err := ioutil.TempDir("", "influxdb-tsdb-")
2468	if err != nil {
2469		panic(err)
2470	}
2471
2472	s := &Store{Store: tsdb.NewStore(path), index: index}
2473	s.EngineOptions.IndexVersion = index
2474	s.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
2475	s.EngineOptions.Config.TraceLoggingEnabled = true
2476
2477	if testing.Verbose() {
2478		s.WithLogger(logger.New(os.Stdout))
2479	}
2480
2481	return s
2482}
2483
2484// MustOpenStore returns a new, open Store using the specified index,
2485// at a temporary path.
2486func MustOpenStore(index string) *Store {
2487	s := NewStore(index)
2488
2489	if err := s.Open(); err != nil {
2490		panic(err)
2491	}
2492	return s
2493}
2494
2495// Reopen closes and reopens the store as a new store.
2496func (s *Store) Reopen() error {
2497	if err := s.Store.Close(); err != nil {
2498		return err
2499	}
2500
2501	s.Store = tsdb.NewStore(s.Path())
2502	s.EngineOptions.IndexVersion = s.index
2503	s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal")
2504	s.EngineOptions.Config.TraceLoggingEnabled = true
2505
2506	if testing.Verbose() {
2507		s.WithLogger(logger.New(os.Stdout))
2508	}
2509	return s.Store.Open()
2510}
2511
2512// Close closes the store and removes the underlying data.
2513func (s *Store) Close() error {
2514	defer os.RemoveAll(s.Path())
2515	return s.Store.Close()
2516}
2517
2518// MustCreateShardWithData creates a shard and writes line protocol data to it.
2519func (s *Store) MustCreateShardWithData(db, rp string, shardID int, data ...string) {
2520	if err := s.CreateShard(db, rp, uint64(shardID), true); err != nil {
2521		panic(err)
2522	}
2523	s.MustWriteToShardString(shardID, data...)
2524}
2525
2526// MustWriteToShardString parses the line protocol (with second precision) and
2527// inserts the resulting points into a shard. Panic on error.
2528func (s *Store) MustWriteToShardString(shardID int, data ...string) {
2529	var points []models.Point
2530	for i := range data {
2531		a, err := models.ParsePointsWithPrecision([]byte(strings.TrimSpace(data[i])), time.Time{}, "s")
2532		if err != nil {
2533			panic(err)
2534		}
2535		points = append(points, a...)
2536	}
2537
2538	if err := s.WriteToShard(tsdb.WriteContext{}, uint64(shardID), points); err != nil {
2539		panic(err)
2540	}
2541}
2542
2543// BatchWrite writes points to a shard in chunks.
2544func (s *Store) BatchWrite(shardID int, points []models.Point) error {
2545	nPts := len(points)
2546	chunkSz := 10000
2547	start := 0
2548	end := chunkSz
2549
2550	for {
2551		if end > nPts {
2552			end = nPts
2553		}
2554		if end-start == 0 {
2555			break
2556		}
2557
2558		if err := s.WriteToShard(tsdb.WriteContext{}, uint64(shardID), points[start:end]); err != nil {
2559			return err
2560		}
2561		start = end
2562		end += chunkSz
2563	}
2564	return nil
2565}
2566
2567// ParseTags returns an instance of Tags for a comma-delimited list of key/values.
2568func ParseTags(s string) query.Tags {
2569	m := make(map[string]string)
2570	for _, kv := range strings.Split(s, ",") {
2571		a := strings.Split(kv, "=")
2572		m[a[0]] = a[1]
2573	}
2574	return query.NewTags(m)
2575}
2576
2577func dirExists(path string) bool {
2578	var err error
2579	if _, err = os.Stat(path); err == nil {
2580		return true
2581	}
2582	return !os.IsNotExist(err)
2583}
2584