1package inmem 2 3import ( 4 "fmt" 5 "math/rand" 6 "strings" 7 "sync" 8 "testing" 9 "time" 10 11 "github.com/influxdata/influxdb/models" 12 "github.com/influxdata/influxdb/query" 13 "github.com/influxdata/influxdb/tsdb" 14 "github.com/influxdata/influxql" 15) 16 17// Test comparing SeriesIDs for equality. 18func TestSeriesIDs_Equals(t *testing.T) { 19 ids1 := seriesIDs([]uint64{1, 2, 3}) 20 ids2 := seriesIDs([]uint64{1, 2, 3}) 21 ids3 := seriesIDs([]uint64{4, 5, 6}) 22 23 if !ids1.Equals(ids2) { 24 t.Fatal("expected ids1 == ids2") 25 } else if ids1.Equals(ids3) { 26 t.Fatal("expected ids1 != ids3") 27 } 28} 29 30// Test intersecting sets of SeriesIDs. 31func TestSeriesIDs_Intersect(t *testing.T) { 32 // Test swapping l & r, all branches of if-else, and exit loop when 'j < len(r)' 33 ids1 := seriesIDs([]uint64{1, 3, 4, 5, 6}) 34 ids2 := seriesIDs([]uint64{1, 2, 3, 7}) 35 exp := seriesIDs([]uint64{1, 3}) 36 got := ids1.Intersect(ids2) 37 38 if !exp.Equals(got) { 39 t.Fatalf("exp=%v, got=%v", exp, got) 40 } 41 42 // Test exit for loop when 'i < len(l)' 43 ids1 = seriesIDs([]uint64{1}) 44 ids2 = seriesIDs([]uint64{1, 2}) 45 exp = seriesIDs([]uint64{1}) 46 got = ids1.Intersect(ids2) 47 48 if !exp.Equals(got) { 49 t.Fatalf("exp=%v, got=%v", exp, got) 50 } 51} 52 53// Test union sets of SeriesIDs. 54func TestSeriesIDs_Union(t *testing.T) { 55 // Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left. 56 ids1 := seriesIDs([]uint64{1, 2, 3, 7}) 57 ids2 := seriesIDs([]uint64{1, 3, 4, 5, 6}) 58 exp := seriesIDs([]uint64{1, 2, 3, 4, 5, 6, 7}) 59 got := ids1.Union(ids2) 60 61 if !exp.Equals(got) { 62 t.Fatalf("exp=%v, got=%v", exp, got) 63 } 64 65 // Test exit because of 'i < len(l)' and append remainder from right. 66 ids1 = seriesIDs([]uint64{1}) 67 ids2 = seriesIDs([]uint64{1, 2}) 68 exp = seriesIDs([]uint64{1, 2}) 69 got = ids1.Union(ids2) 70 71 if !exp.Equals(got) { 72 t.Fatalf("exp=%v, got=%v", exp, got) 73 } 74} 75 76// Test removing one set of SeriesIDs from another. 77func TestSeriesIDs_Reject(t *testing.T) { 78 // Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left. 79 ids1 := seriesIDs([]uint64{1, 2, 3, 7}) 80 ids2 := seriesIDs([]uint64{1, 3, 4, 5, 6}) 81 exp := seriesIDs([]uint64{2, 7}) 82 got := ids1.Reject(ids2) 83 84 if !exp.Equals(got) { 85 t.Fatalf("exp=%v, got=%v", exp, got) 86 } 87 88 // Test exit because of 'i < len(l)'. 89 ids1 = seriesIDs([]uint64{1}) 90 ids2 = seriesIDs([]uint64{1, 2}) 91 exp = seriesIDs{} 92 got = ids1.Reject(ids2) 93 94 if !exp.Equals(got) { 95 t.Fatalf("exp=%v, got=%v", exp, got) 96 } 97} 98 99func TestMeasurement_AddSeries_Nil(t *testing.T) { 100 m := newMeasurement("foo", "cpu") 101 if m.AddSeries(nil) { 102 t.Fatalf("AddSeries mismatch: exp false, got true") 103 } 104} 105 106func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) { 107 m := newMeasurement("foo", "cpu") 108 var dst []string 109 dst = m.AppendSeriesKeysByID(dst, []uint64{1}) 110 if exp, got := 0, len(dst); exp != got { 111 t.Fatalf("series len mismatch: exp %v, got %v", exp, got) 112 } 113} 114 115func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) { 116 m := newMeasurement("foo", "cpu") 117 s := newSeries(1, m, "cpu,host=foo", models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) 118 m.AddSeries(s) 119 120 var dst []string 121 dst = m.AppendSeriesKeysByID(dst, []uint64{1}) 122 if exp, got := 1, len(dst); exp != got { 123 t.Fatalf("series len mismatch: exp %v, got %v", exp, got) 124 } 125 126 if exp, got := "cpu,host=foo", dst[0]; exp != got { 127 t.Fatalf("series mismatch: exp %v, got %v", exp, got) 128 } 129} 130 131func TestMeasurement_TagsSet_Deadlock(t *testing.T) { 132 m := newMeasurement("foo", "cpu") 133 s1 := newSeries(1, m, "cpu,host=foo", models.Tags{models.NewTag([]byte("host"), []byte("foo"))}) 134 m.AddSeries(s1) 135 136 s2 := newSeries(2, m, "cpu,host=bar", models.Tags{models.NewTag([]byte("host"), []byte("bar"))}) 137 m.AddSeries(s2) 138 139 m.DropSeries(s1) 140 141 // This was deadlocking 142 s := tsdb.NewSeriesIDSet() 143 s.Add(1) 144 m.TagSets(s, query.IteratorOptions{}) 145 if got, exp := len(m.SeriesIDs()), 1; got != exp { 146 t.Fatalf("series count mismatch: got %v, exp %v", got, exp) 147 } 148} 149 150// Ensures the tagKeyValue API contains no deadlocks or sync issues. 151func TestTagKeyValue_Concurrent(t *testing.T) { 152 var wg sync.WaitGroup 153 done := make(chan struct{}) 154 time.AfterFunc(2*time.Second, func() { close(done) }) 155 156 v := newTagKeyValue() 157 for i := 0; i < 4; i++ { 158 wg.Add(1) 159 go func(i int) { 160 defer wg.Done() 161 162 rand := rand.New(rand.NewSource(int64(i))) 163 for { 164 // Continue running until time limit. 165 select { 166 case <-done: 167 return 168 default: 169 } 170 171 // Randomly choose next API. 172 switch rand.Intn(7) { 173 case 0: 174 v.bytes() 175 case 1: 176 v.Cardinality() 177 case 2: 178 v.Contains(string([]rune{rune(rand.Intn(52) + 65)})) 179 case 3: 180 v.InsertSeriesIDByte([]byte(string([]rune{rune(rand.Intn(52) + 65)})), rand.Uint64()%1000) 181 case 4: 182 v.Load(string([]rune{rune(rand.Intn(52) + 65)})) 183 case 5: 184 v.Range(func(tagValue string, a seriesIDs) bool { 185 return rand.Intn(10) == 0 186 }) 187 case 6: 188 v.RangeAll(func(k string, a seriesIDs) {}) 189 } 190 } 191 }(i) 192 } 193 wg.Wait() 194} 195 196func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) { 197 m := newMeasurement("foo", "cpu") 198 for i := 0; i < 100000; i++ { 199 s := newSeries(uint64(i), m, "cpu", models.Tags{models.NewTag( 200 []byte("host"), 201 []byte(fmt.Sprintf("host%d", i)))}) 202 m.AddSeries(s) 203 } 204 205 if exp, got := 100000, len(m.SeriesKeys()); exp != got { 206 b.Fatalf("series count mismatch: exp %v got %v", exp, got) 207 } 208 209 stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host =~ /host\d+/`)).ParseStatement() 210 if err != nil { 211 b.Fatalf("invalid statement: %s", err) 212 } 213 214 selectStmt := stmt.(*influxql.SelectStatement) 215 216 b.ResetTimer() 217 for i := 0; i < b.N; i++ { 218 ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr)) 219 if exp, got := 100000, len(ids); exp != got { 220 b.Fatalf("series count mismatch: exp %v got %v", exp, got) 221 } 222 223 } 224} 225 226func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) { 227 m := newMeasurement("foo", "cpu") 228 for i := 0; i < 100000; i++ { 229 s := newSeries(uint64(i), m, "cpu", models.Tags{models.Tag{ 230 Key: []byte("host"), 231 Value: []byte(fmt.Sprintf("host%d", i))}}) 232 m.AddSeries(s) 233 } 234 235 if exp, got := 100000, len(m.SeriesKeys()); exp != got { 236 b.Fatalf("series count mismatch: exp %v got %v", exp, got) 237 } 238 239 stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host !~ /foo\d+/`)).ParseStatement() 240 if err != nil { 241 b.Fatalf("invalid statement: %s", err) 242 } 243 244 selectStmt := stmt.(*influxql.SelectStatement) 245 246 b.ResetTimer() 247 for i := 0; i < b.N; i++ { 248 ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr)) 249 if exp, got := 100000, len(ids); exp != got { 250 b.Fatalf("series count mismatch: exp %v got %v", exp, got) 251 } 252 253 } 254 255} 256 257func benchmarkTagSets(b *testing.B, n int, opt query.IteratorOptions) { 258 m := newMeasurement("foo", "m") 259 ss := tsdb.NewSeriesIDSet() 260 261 for i := 0; i < n; i++ { 262 tags := map[string]string{"tag1": "value1", "tag2": "value2"} 263 s := newSeries(uint64(i), m, "m,tag1=value1,tag2=value2", models.NewTags(tags)) 264 ss.Add(uint64(i)) 265 m.AddSeries(s) 266 } 267 268 // warm caches 269 m.TagSets(ss, opt) 270 271 b.ReportAllocs() 272 b.ResetTimer() 273 for i := 0; i < b.N; i++ { 274 m.TagSets(ss, opt) 275 } 276} 277 278func BenchmarkMeasurement_TagSetsNoDimensions_1000(b *testing.B) { 279 benchmarkTagSets(b, 1000, query.IteratorOptions{}) 280} 281 282func BenchmarkMeasurement_TagSetsDimensions_1000(b *testing.B) { 283 benchmarkTagSets(b, 1000, query.IteratorOptions{Dimensions: []string{"tag1", "tag2"}}) 284} 285 286func BenchmarkMeasurement_TagSetsNoDimensions_100000(b *testing.B) { 287 benchmarkTagSets(b, 100000, query.IteratorOptions{}) 288} 289 290func BenchmarkMeasurement_TagSetsDimensions_100000(b *testing.B) { 291 benchmarkTagSets(b, 100000, query.IteratorOptions{Dimensions: []string{"tag1", "tag2"}}) 292} 293