1package tsdb 2 3import ( 4 "fmt" 5 "io/ioutil" 6 "os" 7 "path" 8 "path/filepath" 9 "regexp" 10 "sort" 11 "strings" 12 "testing" 13 "time" 14 15 "github.com/google/go-cmp/cmp" 16 "github.com/google/go-cmp/cmp/cmpopts" 17 "github.com/influxdata/influxdb/logger" 18 "github.com/influxdata/influxdb/models" 19 "github.com/influxdata/influxql" 20) 21 22func TestShard_MapType(t *testing.T) { 23 var sh *TempShard 24 25 setup := func(index string) { 26 sh = NewTempShard(index) 27 28 if err := sh.Open(); err != nil { 29 t.Fatal(err) 30 } 31 32 sh.MustWritePointsString(` 33cpu,host=serverA,region=uswest value=100 0 34cpu,host=serverA,region=uswest value=50,val2=5 10 35cpu,host=serverB,region=uswest value=25 0 36mem,host=serverA value=25i 0 37mem,host=serverB value=50i,val3=t 10 38_reserved,region=uswest value="foo" 0 39`) 40 } 41 42 for _, index := range RegisteredIndexes() { 43 setup(index) 44 for _, tt := range []struct { 45 measurement string 46 field string 47 typ influxql.DataType 48 }{ 49 { 50 measurement: "cpu", 51 field: "value", 52 typ: influxql.Float, 53 }, 54 { 55 measurement: "cpu", 56 field: "host", 57 typ: influxql.Tag, 58 }, 59 { 60 measurement: "cpu", 61 field: "region", 62 typ: influxql.Tag, 63 }, 64 { 65 measurement: "cpu", 66 field: "val2", 67 typ: influxql.Float, 68 }, 69 { 70 measurement: "cpu", 71 field: "unknown", 72 typ: influxql.Unknown, 73 }, 74 { 75 measurement: "mem", 76 field: "value", 77 typ: influxql.Integer, 78 }, 79 { 80 measurement: "mem", 81 field: "val3", 82 typ: influxql.Boolean, 83 }, 84 { 85 measurement: "mem", 86 field: "host", 87 typ: influxql.Tag, 88 }, 89 { 90 measurement: "unknown", 91 field: "unknown", 92 typ: influxql.Unknown, 93 }, 94 { 95 measurement: "_fieldKeys", 96 field: "fieldKey", 97 typ: influxql.String, 98 }, 99 { 100 measurement: "_fieldKeys", 101 field: "fieldType", 102 typ: influxql.String, 103 }, 104 { 105 measurement: "_fieldKeys", 106 field: "unknown", 107 typ: influxql.Unknown, 108 }, 109 { 110 measurement: "_series", 111 field: "key", 112 typ: influxql.String, 113 }, 114 { 115 measurement: "_series", 116 field: "unknown", 117 typ: influxql.Unknown, 118 }, 119 { 120 measurement: "_tagKeys", 121 field: "tagKey", 122 typ: influxql.String, 123 }, 124 { 125 measurement: "_tagKeys", 126 field: "unknown", 127 typ: influxql.Unknown, 128 }, 129 { 130 measurement: "_reserved", 131 field: "value", 132 typ: influxql.String, 133 }, 134 { 135 measurement: "_reserved", 136 field: "region", 137 typ: influxql.Tag, 138 }, 139 } { 140 name := fmt.Sprintf("%s_%s_%s", index, tt.measurement, tt.field) 141 t.Run(name, func(t *testing.T) { 142 typ, err := sh.mapType(tt.measurement, tt.field) 143 if err != nil { 144 t.Fatal(err) 145 } 146 147 if have, want := typ, tt.typ; have != want { 148 t.Errorf("unexpected data type: have=%#v want=%#v", have, want) 149 } 150 }) 151 } 152 sh.Close() 153 } 154} 155 156func TestShard_MeasurementsByRegex(t *testing.T) { 157 var sh *TempShard 158 setup := func(index string) { 159 sh = NewTempShard(index) 160 if err := sh.Open(); err != nil { 161 t.Fatal(err) 162 } 163 164 sh.MustWritePointsString(` 165cpu,host=serverA,region=uswest value=100 0 166cpu,host=serverA,region=uswest value=50,val2=5 10 167cpu,host=serverB,region=uswest value=25 0 168mem,host=serverA value=25i 0 169mem,host=serverB value=50i,val3=t 10 170`) 171 } 172 173 for _, index := range RegisteredIndexes() { 174 setup(index) 175 for _, tt := range []struct { 176 regex string 177 measurements []string 178 }{ 179 {regex: `cpu`, measurements: []string{"cpu"}}, 180 {regex: `mem`, measurements: []string{"mem"}}, 181 {regex: `cpu|mem`, measurements: []string{"cpu", "mem"}}, 182 {regex: `gpu`, measurements: []string{}}, 183 {regex: `pu`, measurements: []string{"cpu"}}, 184 {regex: `p|m`, measurements: []string{"cpu", "mem"}}, 185 } { 186 t.Run(index+"_"+tt.regex, func(t *testing.T) { 187 re := regexp.MustCompile(tt.regex) 188 measurements, err := sh.MeasurementNamesByRegex(re) 189 if err != nil { 190 t.Fatal(err) 191 } 192 193 mstrings := make([]string, 0, len(measurements)) 194 for _, name := range measurements { 195 mstrings = append(mstrings, string(name)) 196 } 197 sort.Strings(mstrings) 198 if diff := cmp.Diff(tt.measurements, mstrings, cmpopts.EquateEmpty()); diff != "" { 199 t.Errorf("unexpected measurements:\n%s", diff) 200 } 201 }) 202 } 203 sh.Close() 204 } 205} 206 207// TempShard represents a test wrapper for Shard that uses temporary 208// filesystem paths. 209type TempShard struct { 210 *Shard 211 path string 212 sfile *SeriesFile 213} 214 215// NewTempShard returns a new instance of TempShard with temp paths. 216func NewTempShard(index string) *TempShard { 217 // Create temporary path for data and WAL. 218 dir, err := ioutil.TempDir("", "influxdb-tsdb-") 219 if err != nil { 220 panic(err) 221 } 222 223 // Create series file. 224 sfile := NewSeriesFile(filepath.Join(dir, "db0", SeriesFileDirectory)) 225 sfile.Logger = logger.New(os.Stdout) 226 if err := sfile.Open(); err != nil { 227 panic(err) 228 } 229 230 // Build engine options. 231 opt := NewEngineOptions() 232 opt.IndexVersion = index 233 opt.Config.WALDir = filepath.Join(dir, "wal") 234 if index == InmemIndexName { 235 opt.InmemIndex, _ = NewInmemIndex(path.Base(dir), sfile) 236 } 237 238 return &TempShard{ 239 Shard: NewShard(0, 240 filepath.Join(dir, "data", "db0", "rp0", "1"), 241 filepath.Join(dir, "wal", "db0", "rp0", "1"), 242 sfile, 243 opt, 244 ), 245 sfile: sfile, 246 path: dir, 247 } 248} 249 250// Close closes the shard and removes all underlying data. 251func (sh *TempShard) Close() error { 252 defer os.RemoveAll(sh.path) 253 sh.sfile.Close() 254 return sh.Shard.Close() 255} 256 257// MustWritePointsString parses the line protocol (with second precision) and 258// inserts the resulting points into the shard. Panic on error. 259func (sh *TempShard) MustWritePointsString(s string) { 260 a, err := models.ParsePointsWithPrecision([]byte(strings.TrimSpace(s)), time.Time{}, "s") 261 if err != nil { 262 panic(err) 263 } 264 265 if err := sh.WritePoints(a); err != nil { 266 panic(err) 267 } 268} 269