1package tsdb
2
3import (
4	"fmt"
5	"io/ioutil"
6	"os"
7	"path"
8	"path/filepath"
9	"regexp"
10	"sort"
11	"strings"
12	"testing"
13	"time"
14
15	"github.com/google/go-cmp/cmp"
16	"github.com/google/go-cmp/cmp/cmpopts"
17	"github.com/influxdata/influxdb/logger"
18	"github.com/influxdata/influxdb/models"
19	"github.com/influxdata/influxql"
20)
21
22func TestShard_MapType(t *testing.T) {
23	var sh *TempShard
24
25	setup := func(index string) {
26		sh = NewTempShard(index)
27
28		if err := sh.Open(); err != nil {
29			t.Fatal(err)
30		}
31
32		sh.MustWritePointsString(`
33cpu,host=serverA,region=uswest value=100 0
34cpu,host=serverA,region=uswest value=50,val2=5  10
35cpu,host=serverB,region=uswest value=25  0
36mem,host=serverA value=25i 0
37mem,host=serverB value=50i,val3=t 10
38_reserved,region=uswest value="foo" 0
39`)
40	}
41
42	for _, index := range RegisteredIndexes() {
43		setup(index)
44		for _, tt := range []struct {
45			measurement string
46			field       string
47			typ         influxql.DataType
48		}{
49			{
50				measurement: "cpu",
51				field:       "value",
52				typ:         influxql.Float,
53			},
54			{
55				measurement: "cpu",
56				field:       "host",
57				typ:         influxql.Tag,
58			},
59			{
60				measurement: "cpu",
61				field:       "region",
62				typ:         influxql.Tag,
63			},
64			{
65				measurement: "cpu",
66				field:       "val2",
67				typ:         influxql.Float,
68			},
69			{
70				measurement: "cpu",
71				field:       "unknown",
72				typ:         influxql.Unknown,
73			},
74			{
75				measurement: "mem",
76				field:       "value",
77				typ:         influxql.Integer,
78			},
79			{
80				measurement: "mem",
81				field:       "val3",
82				typ:         influxql.Boolean,
83			},
84			{
85				measurement: "mem",
86				field:       "host",
87				typ:         influxql.Tag,
88			},
89			{
90				measurement: "unknown",
91				field:       "unknown",
92				typ:         influxql.Unknown,
93			},
94			{
95				measurement: "_fieldKeys",
96				field:       "fieldKey",
97				typ:         influxql.String,
98			},
99			{
100				measurement: "_fieldKeys",
101				field:       "fieldType",
102				typ:         influxql.String,
103			},
104			{
105				measurement: "_fieldKeys",
106				field:       "unknown",
107				typ:         influxql.Unknown,
108			},
109			{
110				measurement: "_series",
111				field:       "key",
112				typ:         influxql.String,
113			},
114			{
115				measurement: "_series",
116				field:       "unknown",
117				typ:         influxql.Unknown,
118			},
119			{
120				measurement: "_tagKeys",
121				field:       "tagKey",
122				typ:         influxql.String,
123			},
124			{
125				measurement: "_tagKeys",
126				field:       "unknown",
127				typ:         influxql.Unknown,
128			},
129			{
130				measurement: "_reserved",
131				field:       "value",
132				typ:         influxql.String,
133			},
134			{
135				measurement: "_reserved",
136				field:       "region",
137				typ:         influxql.Tag,
138			},
139		} {
140			name := fmt.Sprintf("%s_%s_%s", index, tt.measurement, tt.field)
141			t.Run(name, func(t *testing.T) {
142				typ, err := sh.mapType(tt.measurement, tt.field)
143				if err != nil {
144					t.Fatal(err)
145				}
146
147				if have, want := typ, tt.typ; have != want {
148					t.Errorf("unexpected data type: have=%#v want=%#v", have, want)
149				}
150			})
151		}
152		sh.Close()
153	}
154}
155
156func TestShard_MeasurementsByRegex(t *testing.T) {
157	var sh *TempShard
158	setup := func(index string) {
159		sh = NewTempShard(index)
160		if err := sh.Open(); err != nil {
161			t.Fatal(err)
162		}
163
164		sh.MustWritePointsString(`
165cpu,host=serverA,region=uswest value=100 0
166cpu,host=serverA,region=uswest value=50,val2=5  10
167cpu,host=serverB,region=uswest value=25  0
168mem,host=serverA value=25i 0
169mem,host=serverB value=50i,val3=t 10
170`)
171	}
172
173	for _, index := range RegisteredIndexes() {
174		setup(index)
175		for _, tt := range []struct {
176			regex        string
177			measurements []string
178		}{
179			{regex: `cpu`, measurements: []string{"cpu"}},
180			{regex: `mem`, measurements: []string{"mem"}},
181			{regex: `cpu|mem`, measurements: []string{"cpu", "mem"}},
182			{regex: `gpu`, measurements: []string{}},
183			{regex: `pu`, measurements: []string{"cpu"}},
184			{regex: `p|m`, measurements: []string{"cpu", "mem"}},
185		} {
186			t.Run(index+"_"+tt.regex, func(t *testing.T) {
187				re := regexp.MustCompile(tt.regex)
188				measurements, err := sh.MeasurementNamesByRegex(re)
189				if err != nil {
190					t.Fatal(err)
191				}
192
193				mstrings := make([]string, 0, len(measurements))
194				for _, name := range measurements {
195					mstrings = append(mstrings, string(name))
196				}
197				sort.Strings(mstrings)
198				if diff := cmp.Diff(tt.measurements, mstrings, cmpopts.EquateEmpty()); diff != "" {
199					t.Errorf("unexpected measurements:\n%s", diff)
200				}
201			})
202		}
203		sh.Close()
204	}
205}
206
207// TempShard represents a test wrapper for Shard that uses temporary
208// filesystem paths.
209type TempShard struct {
210	*Shard
211	path  string
212	sfile *SeriesFile
213}
214
215// NewTempShard returns a new instance of TempShard with temp paths.
216func NewTempShard(index string) *TempShard {
217	// Create temporary path for data and WAL.
218	dir, err := ioutil.TempDir("", "influxdb-tsdb-")
219	if err != nil {
220		panic(err)
221	}
222
223	// Create series file.
224	sfile := NewSeriesFile(filepath.Join(dir, "db0", SeriesFileDirectory))
225	sfile.Logger = logger.New(os.Stdout)
226	if err := sfile.Open(); err != nil {
227		panic(err)
228	}
229
230	// Build engine options.
231	opt := NewEngineOptions()
232	opt.IndexVersion = index
233	opt.Config.WALDir = filepath.Join(dir, "wal")
234	if index == InmemIndexName {
235		opt.InmemIndex, _ = NewInmemIndex(path.Base(dir), sfile)
236	}
237
238	return &TempShard{
239		Shard: NewShard(0,
240			filepath.Join(dir, "data", "db0", "rp0", "1"),
241			filepath.Join(dir, "wal", "db0", "rp0", "1"),
242			sfile,
243			opt,
244		),
245		sfile: sfile,
246		path:  dir,
247	}
248}
249
250// Close closes the shard and removes all underlying data.
251func (sh *TempShard) Close() error {
252	defer os.RemoveAll(sh.path)
253	sh.sfile.Close()
254	return sh.Shard.Close()
255}
256
257// MustWritePointsString parses the line protocol (with second precision) and
258// inserts the resulting points into the shard. Panic on error.
259func (sh *TempShard) MustWritePointsString(s string) {
260	a, err := models.ParsePointsWithPrecision([]byte(strings.TrimSpace(s)), time.Time{}, "s")
261	if err != nil {
262		panic(err)
263	}
264
265	if err := sh.WritePoints(a); err != nil {
266		panic(err)
267	}
268}
269