1package tsi1_test 2 3import ( 4 "bytes" 5 "fmt" 6 "io/ioutil" 7 "math/rand" 8 "os" 9 "path/filepath" 10 "reflect" 11 "regexp" 12 "runtime/pprof" 13 "sort" 14 "testing" 15 "time" 16 17 "github.com/influxdata/influxdb/pkg/slices" 18 19 "github.com/influxdata/influxdb/models" 20 "github.com/influxdata/influxdb/pkg/bloom" 21 "github.com/influxdata/influxdb/tsdb" 22 "github.com/influxdata/influxdb/tsdb/index/tsi1" 23) 24 25// Ensure log file can append series. 26func TestLogFile_AddSeriesList(t *testing.T) { 27 sfile := MustOpenSeriesFile() 28 defer sfile.Close() 29 30 f := MustOpenLogFile(sfile.SeriesFile) 31 defer f.Close() 32 seriesSet := tsdb.NewSeriesIDSet() 33 34 // Add test data. 35 ids, err := f.AddSeriesList(seriesSet, 36 slices.StringsToBytes("cpu", "mem"), 37 []models.Tags{ 38 models.NewTags(map[string]string{"region": "us-east"}), 39 models.NewTags(map[string]string{"host": "serverA"}), 40 }, 41 ) 42 43 if err != nil { 44 t.Fatal(err) 45 } 46 47 // Returned series ids should match those in the seriesSet. 48 other := tsdb.NewSeriesIDSet(ids...) 49 if !other.Equals(seriesSet) { 50 t.Fatalf("got series ids %s, expected %s", other, seriesSet) 51 } 52 53 // Add the same series again with a new one. 54 ids, err = f.AddSeriesList(seriesSet, 55 slices.StringsToBytes("cpu", "mem"), 56 []models.Tags{ 57 models.NewTags(map[string]string{"region": "us-west"}), 58 models.NewTags(map[string]string{"host": "serverA"}), 59 }, 60 ) 61 62 if err != nil { 63 t.Fatal(err) 64 } 65 66 if got, exp := len(ids), 2; got != exp { 67 t.Fatalf("got %d series ids, expected %d", got, exp) 68 } else if got := ids[0]; got == 0 { 69 t.Error("series id was 0, expected it not to be") 70 } else if got := ids[1]; got != 0 { 71 t.Errorf("got series id %d, expected 0", got) 72 } 73 74 // Add only the same series IDs. 75 ids, err = f.AddSeriesList(seriesSet, 76 slices.StringsToBytes("cpu", "mem"), 77 []models.Tags{ 78 models.NewTags(map[string]string{"region": "us-west"}), 79 models.NewTags(map[string]string{"host": "serverA"}), 80 }, 81 ) 82 83 if err != nil { 84 t.Fatal(err) 85 } 86 87 if got, exp := ids, make([]uint64, 2); !reflect.DeepEqual(got, exp) { 88 t.Fatalf("got ids %v, expected %v", got, exp) 89 } 90 91 // Verify data. 92 itr := f.MeasurementIterator() 93 if e := itr.Next(); e == nil || string(e.Name()) != "cpu" { 94 t.Fatalf("unexpected measurement: %#v", e) 95 } else if e := itr.Next(); e == nil || string(e.Name()) != "mem" { 96 t.Fatalf("unexpected measurement: %#v", e) 97 } else if e := itr.Next(); e != nil { 98 t.Fatalf("expected eof, got: %#v", e) 99 } 100 101 // Reopen file and re-verify. 102 if err := f.Reopen(); err != nil { 103 t.Fatal(err) 104 } 105 106 // Verify data. 107 itr = f.MeasurementIterator() 108 if e := itr.Next(); e == nil || string(e.Name()) != "cpu" { 109 t.Fatalf("unexpected measurement: %#v", e) 110 } else if e := itr.Next(); e == nil || string(e.Name()) != "mem" { 111 t.Fatalf("unexpected measurement: %#v", e) 112 } else if e := itr.Next(); e != nil { 113 t.Fatalf("expected eof, got: %#v", e) 114 } 115} 116 117func TestLogFile_SeriesStoredInOrder(t *testing.T) { 118 sfile := MustOpenSeriesFile() 119 defer sfile.Close() 120 121 f := MustOpenLogFile(sfile.SeriesFile) 122 defer f.Close() 123 seriesSet := tsdb.NewSeriesIDSet() 124 125 // Generate and add test data 126 tvm := make(map[string]struct{}) 127 rand.Seed(time.Now().Unix()) 128 for i := 0; i < 100; i++ { 129 tv := fmt.Sprintf("server-%d", rand.Intn(50)) // Encourage adding duplicate series. 130 tvm[tv] = struct{}{} 131 132 if _, err := f.AddSeriesList(seriesSet, [][]byte{ 133 []byte("mem"), 134 []byte("cpu"), 135 }, []models.Tags{ 136 {models.NewTag([]byte("host"), []byte(tv))}, 137 {models.NewTag([]byte("host"), []byte(tv))}, 138 }); err != nil { 139 t.Fatal(err) 140 } 141 } 142 143 // Sort the tag values so we know what order to expect. 144 tvs := make([]string, 0, len(tvm)) 145 for tv := range tvm { 146 tvs = append(tvs, tv) 147 } 148 sort.Strings(tvs) 149 150 // Double the series values since we're adding them twice (two measurements) 151 tvs = append(tvs, tvs...) 152 153 // When we pull the series out via an iterator they should be in order. 154 itr := f.SeriesIDIterator() 155 if itr == nil { 156 t.Fatal("nil iterator") 157 } 158 159 var prevSeriesID uint64 160 for i := 0; i < len(tvs); i++ { 161 elem, err := itr.Next() 162 if err != nil { 163 t.Fatal(err) 164 } else if elem.SeriesID == 0 { 165 t.Fatal("got nil series") 166 } else if elem.SeriesID < prevSeriesID { 167 t.Fatalf("series out of order: %d !< %d ", elem.SeriesID, prevSeriesID) 168 } 169 prevSeriesID = elem.SeriesID 170 } 171} 172 173// Ensure log file can delete an existing measurement. 174func TestLogFile_DeleteMeasurement(t *testing.T) { 175 sfile := MustOpenSeriesFile() 176 defer sfile.Close() 177 178 f := MustOpenLogFile(sfile.SeriesFile) 179 defer f.Close() 180 seriesSet := tsdb.NewSeriesIDSet() 181 182 // Add test data. 183 if _, err := f.AddSeriesList(seriesSet, [][]byte{ 184 []byte("mem"), 185 []byte("cpu"), 186 []byte("cpu"), 187 }, []models.Tags{ 188 {{Key: []byte("host"), Value: []byte("serverA")}}, 189 {{Key: []byte("region"), Value: []byte("us-east")}}, 190 {{Key: []byte("region"), Value: []byte("us-west")}}, 191 }); err != nil { 192 t.Fatal(err) 193 } 194 195 // Remove measurement. 196 if err := f.DeleteMeasurement([]byte("cpu")); err != nil { 197 t.Fatal(err) 198 } 199 200 // Verify data. 201 itr := f.MeasurementIterator() 202 if e := itr.Next(); string(e.Name()) != "cpu" || !e.Deleted() { 203 t.Fatalf("unexpected measurement: %s/%v", e.Name(), e.Deleted()) 204 } else if e := itr.Next(); string(e.Name()) != "mem" || e.Deleted() { 205 t.Fatalf("unexpected measurement: %s/%v", e.Name(), e.Deleted()) 206 } else if e := itr.Next(); e != nil { 207 t.Fatalf("expected eof, got: %#v", e) 208 } 209} 210 211// Ensure log file can recover correctly. 212func TestLogFile_Open(t *testing.T) { 213 t.Run("Truncate", func(t *testing.T) { 214 sfile := MustOpenSeriesFile() 215 defer sfile.Close() 216 seriesSet := tsdb.NewSeriesIDSet() 217 218 f := MustOpenLogFile(sfile.SeriesFile) 219 defer f.Close() 220 221 // Add test data & close. 222 if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil { 223 t.Fatal(err) 224 } else if err := f.LogFile.Close(); err != nil { 225 t.Fatal(err) 226 } 227 228 // Truncate data & reopen. 229 if fi, err := os.Stat(f.LogFile.Path()); err != nil { 230 t.Fatal(err) 231 } else if err := os.Truncate(f.LogFile.Path(), fi.Size()-1); err != nil { 232 t.Fatal(err) 233 } else if err := f.LogFile.Open(); err != nil { 234 t.Fatal(err) 235 } 236 237 // Verify data. 238 itr := f.SeriesIDIterator() 239 if elem, err := itr.Next(); err != nil { 240 t.Fatal(err) 241 } else if name, tags := sfile.Series(elem.SeriesID); string(name) != `cpu` { 242 t.Fatalf("unexpected series: %s,%s", name, tags.String()) 243 } else if elem, err := itr.Next(); err != nil { 244 t.Fatal(err) 245 } else if elem.SeriesID != 0 { 246 t.Fatalf("expected eof, got: %#v", elem) 247 } 248 249 // Add more data & reopen. 250 if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("disk")}, []models.Tags{{{}}}); err != nil { 251 t.Fatal(err) 252 } else if err := f.Reopen(); err != nil { 253 t.Fatal(err) 254 } 255 256 // Verify new data. 257 itr = f.SeriesIDIterator() 258 if elem, err := itr.Next(); err != nil { 259 t.Fatal(err) 260 } else if name, tags := sfile.Series(elem.SeriesID); string(name) != `cpu` { 261 t.Fatalf("unexpected series: %s,%s", name, tags.String()) 262 } else if elem, err := itr.Next(); err != nil { 263 t.Fatal(err) 264 } else if name, tags := sfile.Series(elem.SeriesID); string(name) != `disk` { 265 t.Fatalf("unexpected series: %s,%s", name, tags.String()) 266 } else if elem, err := itr.Next(); err != nil { 267 t.Fatal(err) 268 } else if elem.SeriesID != 0 { 269 t.Fatalf("expected eof, got: %#v", elem) 270 } 271 }) 272 273 t.Run("ChecksumMismatch", func(t *testing.T) { 274 sfile := MustOpenSeriesFile() 275 defer sfile.Close() 276 seriesSet := tsdb.NewSeriesIDSet() 277 278 f := MustOpenLogFile(sfile.SeriesFile) 279 defer f.Close() 280 281 // Add test data & close. 282 if _, err := f.AddSeriesList(seriesSet, [][]byte{[]byte("cpu"), []byte("mem")}, []models.Tags{{{}}, {{}}}); err != nil { 283 t.Fatal(err) 284 } else if err := f.LogFile.Close(); err != nil { 285 t.Fatal(err) 286 } 287 288 // Corrupt last entry. 289 buf, err := ioutil.ReadFile(f.LogFile.Path()) 290 if err != nil { 291 t.Fatal(err) 292 } 293 buf[len(buf)-1] = 0 294 295 // Overwrite file with corrupt entry and reopen. 296 if err := ioutil.WriteFile(f.LogFile.Path(), buf, 0666); err != nil { 297 t.Fatal(err) 298 } else if err := f.LogFile.Open(); err != nil { 299 t.Fatal(err) 300 } 301 302 // Verify data. 303 itr := f.SeriesIDIterator() 304 if elem, err := itr.Next(); err != nil { 305 t.Fatal(err) 306 } else if name, tags := sfile.Series(elem.SeriesID); string(name) != `cpu` { 307 t.Fatalf("unexpected series: %s,%s", name, tags.String()) 308 } else if elem, err := itr.Next(); err != nil { 309 t.Fatal(err) 310 } else if elem.SeriesID != 0 { 311 t.Fatalf("expected eof, got: %#v", elem) 312 } 313 }) 314} 315 316// LogFile is a test wrapper for tsi1.LogFile. 317type LogFile struct { 318 *tsi1.LogFile 319} 320 321// NewLogFile returns a new instance of LogFile with a temporary file path. 322func NewLogFile(sfile *tsdb.SeriesFile) *LogFile { 323 file, err := ioutil.TempFile("", "tsi1-log-file-") 324 if err != nil { 325 panic(err) 326 } 327 file.Close() 328 329 return &LogFile{LogFile: tsi1.NewLogFile(sfile, file.Name())} 330} 331 332// MustOpenLogFile returns a new, open instance of LogFile. Panic on error. 333func MustOpenLogFile(sfile *tsdb.SeriesFile) *LogFile { 334 f := NewLogFile(sfile) 335 if err := f.Open(); err != nil { 336 panic(err) 337 } 338 return f 339} 340 341// Close closes the log file and removes it from disk. 342func (f *LogFile) Close() error { 343 defer os.Remove(f.Path()) 344 return f.LogFile.Close() 345} 346 347// Reopen closes and reopens the file. 348func (f *LogFile) Reopen() error { 349 if err := f.LogFile.Close(); err != nil { 350 return err 351 } 352 if err := f.LogFile.Open(); err != nil { 353 return err 354 } 355 return nil 356} 357 358// CreateLogFile creates a new temporary log file and adds a list of series. 359func CreateLogFile(sfile *tsdb.SeriesFile, series []Series) (*LogFile, error) { 360 f := MustOpenLogFile(sfile) 361 seriesSet := tsdb.NewSeriesIDSet() 362 for _, serie := range series { 363 if _, err := f.AddSeriesList(seriesSet, [][]byte{serie.Name}, []models.Tags{serie.Tags}); err != nil { 364 return nil, err 365 } 366 } 367 return f, nil 368} 369 370// GenerateLogFile generates a log file from a set of series based on the count arguments. 371// Total series returned will equal measurementN * tagN * valueN. 372func GenerateLogFile(sfile *tsdb.SeriesFile, measurementN, tagN, valueN int) (*LogFile, error) { 373 tagValueN := pow(valueN, tagN) 374 375 f := MustOpenLogFile(sfile) 376 seriesSet := tsdb.NewSeriesIDSet() 377 for i := 0; i < measurementN; i++ { 378 name := []byte(fmt.Sprintf("measurement%d", i)) 379 380 // Generate tag sets. 381 for j := 0; j < tagValueN; j++ { 382 var tags models.Tags 383 for k := 0; k < tagN; k++ { 384 key := []byte(fmt.Sprintf("key%d", k)) 385 value := []byte(fmt.Sprintf("value%d", (j / pow(valueN, k) % valueN))) 386 tags = append(tags, models.NewTag(key, value)) 387 } 388 if _, err := f.AddSeriesList(seriesSet, [][]byte{name}, []models.Tags{tags}); err != nil { 389 return nil, err 390 } 391 } 392 } 393 return f, nil 394} 395 396func benchmarkLogFile_AddSeries(b *testing.B, measurementN, seriesKeyN, seriesValueN int) { 397 sfile := MustOpenSeriesFile() 398 defer sfile.Close() 399 400 b.StopTimer() 401 f := MustOpenLogFile(sfile.SeriesFile) 402 seriesSet := tsdb.NewSeriesIDSet() 403 404 type Datum struct { 405 Name []byte 406 Tags models.Tags 407 } 408 409 // Pre-generate everything. 410 var ( 411 data []Datum 412 series int 413 ) 414 415 tagValueN := pow(seriesValueN, seriesKeyN) 416 417 for i := 0; i < measurementN; i++ { 418 name := []byte(fmt.Sprintf("measurement%d", i)) 419 for j := 0; j < tagValueN; j++ { 420 var tags models.Tags 421 for k := 0; k < seriesKeyN; k++ { 422 key := []byte(fmt.Sprintf("key%d", k)) 423 value := []byte(fmt.Sprintf("value%d", (j / pow(seriesValueN, k) % seriesValueN))) 424 tags = append(tags, models.NewTag(key, value)) 425 } 426 data = append(data, Datum{Name: name, Tags: tags}) 427 series += len(tags) 428 } 429 } 430 b.StartTimer() 431 b.ResetTimer() 432 433 for i := 0; i < b.N; i++ { 434 for _, d := range data { 435 if _, err := f.AddSeriesList(seriesSet, [][]byte{d.Name}, []models.Tags{d.Tags}); err != nil { 436 b.Fatal(err) 437 } 438 } 439 } 440} 441 442func BenchmarkLogFile_AddSeries_100_1_1(b *testing.B) { benchmarkLogFile_AddSeries(b, 100, 1, 1) } // 100 series 443func BenchmarkLogFile_AddSeries_1000_1_1(b *testing.B) { benchmarkLogFile_AddSeries(b, 1000, 1, 1) } // 1000 series 444func BenchmarkLogFile_AddSeries_10000_1_1(b *testing.B) { benchmarkLogFile_AddSeries(b, 10000, 1, 1) } // 10000 series 445func BenchmarkLogFile_AddSeries_100_2_10(b *testing.B) { benchmarkLogFile_AddSeries(b, 100, 2, 10) } // ~20K series 446func BenchmarkLogFile_AddSeries_100000_1_1(b *testing.B) { benchmarkLogFile_AddSeries(b, 100000, 1, 1) } // ~100K series 447func BenchmarkLogFile_AddSeries_100_3_7(b *testing.B) { benchmarkLogFile_AddSeries(b, 100, 3, 7) } // ~100K series 448func BenchmarkLogFile_AddSeries_200_3_7(b *testing.B) { benchmarkLogFile_AddSeries(b, 200, 3, 7) } // ~200K series 449func BenchmarkLogFile_AddSeries_200_4_7(b *testing.B) { benchmarkLogFile_AddSeries(b, 200, 4, 7) } // ~1.9M series 450 451func BenchmarkLogFile_WriteTo(b *testing.B) { 452 for _, seriesN := range []int{1000, 10000, 100000, 1000000} { 453 name := fmt.Sprintf("series=%d", seriesN) 454 b.Run(name, func(b *testing.B) { 455 sfile := MustOpenSeriesFile() 456 defer sfile.Close() 457 458 f := MustOpenLogFile(sfile.SeriesFile) 459 defer f.Close() 460 seriesSet := tsdb.NewSeriesIDSet() 461 462 // Estimate bloom filter size. 463 m, k := bloom.Estimate(uint64(seriesN), 0.02) 464 465 // Initialize log file with series data. 466 for i := 0; i < seriesN; i++ { 467 if _, err := f.AddSeriesList( 468 seriesSet, 469 [][]byte{[]byte("cpu")}, 470 []models.Tags{{ 471 {Key: []byte("host"), Value: []byte(fmt.Sprintf("server-%d", i))}, 472 {Key: []byte("location"), Value: []byte("us-west")}, 473 }}, 474 ); err != nil { 475 b.Fatal(err) 476 } 477 } 478 b.ResetTimer() 479 480 // Create cpu profile for each subtest. 481 MustStartCPUProfile(name) 482 defer pprof.StopCPUProfile() 483 484 // Compact log file. 485 for i := 0; i < b.N; i++ { 486 buf := bytes.NewBuffer(make([]byte, 0, 150*seriesN)) 487 if _, err := f.CompactTo(buf, m, k, nil); err != nil { 488 b.Fatal(err) 489 } 490 b.Logf("sz=%db", buf.Len()) 491 } 492 }) 493 } 494} 495 496// MustStartCPUProfile starts a cpu profile in a temporary path based on name. 497func MustStartCPUProfile(name string) { 498 name = regexp.MustCompile(`\W+`).ReplaceAllString(name, "-") 499 500 // Open file and start pprof. 501 f, err := os.Create(filepath.Join("/tmp", fmt.Sprintf("cpu-%s.pprof", name))) 502 if err != nil { 503 panic(err) 504 } 505 if err := pprof.StartCPUProfile(f); err != nil { 506 panic(err) 507 } 508} 509