1package tsm1_test 2 3import ( 4 "fmt" 5 "io" 6 "os" 7 "path/filepath" 8 "reflect" 9 "sort" 10 "testing" 11 12 "github.com/influxdata/influxdb/tsdb/engine/tsm1" 13) 14 15func TestDigest_None(t *testing.T) { 16 dir := MustTempDir() 17 dataDir := filepath.Join(dir, "data") 18 if err := os.Mkdir(dataDir, 0755); err != nil { 19 t.Fatalf("create data dir: %v", err) 20 } 21 22 df := MustTempFile(dir) 23 24 files := []string{} 25 if err := tsm1.Digest(dir, files, df); err != nil { 26 t.Fatalf("digest error: %v", err) 27 } 28 29 df, err := os.Open(df.Name()) 30 if err != nil { 31 t.Fatalf("open error: %v", err) 32 } 33 34 r, err := tsm1.NewDigestReader(df) 35 if err != nil { 36 t.Fatalf("NewDigestReader error: %v", err) 37 } 38 defer r.Close() 39 40 mfest, err := r.ReadManifest() 41 if err != nil { 42 t.Fatal(err) 43 } 44 45 if len(mfest.Entries) != 0 { 46 t.Fatalf("exp: 0, got: %d", len(mfest.Entries)) 47 } 48 49 var count int 50 for { 51 _, _, err := r.ReadTimeSpan() 52 if err == io.EOF { 53 break 54 } 55 56 count++ 57 } 58 59 if got, exp := count, 0; got != exp { 60 t.Fatalf("count mismatch: got %v, exp %v", got, exp) 61 } 62} 63 64func TestDigest_One(t *testing.T) { 65 dir := MustTempDir() 66 dataDir := filepath.Join(dir, "data") 67 if err := os.Mkdir(dataDir, 0755); err != nil { 68 t.Fatalf("create data dir: %v", err) 69 } 70 71 a1 := tsm1.NewValue(1, 1.1) 72 writes := map[string][]tsm1.Value{ 73 "cpu,host=A#!~#value": []tsm1.Value{a1}, 74 } 75 MustWriteTSM(dir, 1, writes) 76 77 files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", tsm1.TSMFileExtension))) 78 if err != nil { 79 t.Fatal(err) 80 } 81 82 df := MustTempFile(dir) 83 84 if err := tsm1.Digest(dir, files, df); err != nil { 85 t.Fatalf("digest error: %v", err) 86 } 87 88 df, err = os.Open(df.Name()) 89 if err != nil { 90 t.Fatalf("open error: %v", err) 91 } 92 93 r, err := tsm1.NewDigestReader(df) 94 if err != nil { 95 t.Fatalf("NewDigestReader error: %v", err) 96 } 97 defer r.Close() 98 99 mfest, err := r.ReadManifest() 100 if err != nil { 101 t.Fatal(err) 102 } 103 104 if len(mfest.Entries) != 1 { 105 t.Fatalf("exp: 1, got: %d", len(mfest.Entries)) 106 } 107 108 var count int 109 for { 110 key, _, err := r.ReadTimeSpan() 111 if err == io.EOF { 112 break 113 } 114 115 if got, exp := key, "cpu,host=A#!~#value"; got != exp { 116 t.Fatalf("key mismatch: got %v, exp %v", got, exp) 117 } 118 119 count++ 120 } 121 122 if got, exp := count, 1; got != exp { 123 t.Fatalf("count mismatch: got %v, exp %v", got, exp) 124 } 125} 126 127func TestDigest_TimeFilter(t *testing.T) { 128 dir := MustTempDir() 129 dataDir := filepath.Join(dir, "data") 130 if err := os.Mkdir(dataDir, 0755); err != nil { 131 t.Fatalf("create data dir: %v", err) 132 } 133 134 a1 := tsm1.NewValue(1, 1.1) 135 writes := map[string][]tsm1.Value{ 136 "cpu,host=A#!~#value": []tsm1.Value{a1}, 137 } 138 MustWriteTSM(dir, 1, writes) 139 140 a2 := tsm1.NewValue(2, 2.1) 141 writes = map[string][]tsm1.Value{ 142 "cpu,host=A#!~#value": []tsm1.Value{a2}, 143 } 144 MustWriteTSM(dir, 2, writes) 145 146 a3 := tsm1.NewValue(3, 3.1) 147 writes = map[string][]tsm1.Value{ 148 "cpu,host=A#!~#value": []tsm1.Value{a3}, 149 } 150 MustWriteTSM(dir, 3, writes) 151 152 files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", tsm1.TSMFileExtension))) 153 if err != nil { 154 t.Fatal(err) 155 } 156 157 df := MustTempFile(dir) 158 159 if err := tsm1.DigestWithOptions(dir, files, tsm1.DigestOptions{MinTime: 2, MaxTime: 2}, df); err != nil { 160 t.Fatalf("digest error: %v", err) 161 } 162 163 df, err = os.Open(df.Name()) 164 if err != nil { 165 t.Fatalf("open error: %v", err) 166 } 167 168 r, err := tsm1.NewDigestReader(df) 169 if err != nil { 170 t.Fatalf("NewDigestReader error: %v", err) 171 } 172 defer r.Close() 173 174 mfest, err := r.ReadManifest() 175 if err != nil { 176 t.Fatal(err) 177 } 178 179 if len(mfest.Entries) != 3 { 180 t.Fatalf("exp: 3, got: %d", len(mfest.Entries)) 181 } 182 183 var count int 184 for { 185 key, ts, err := r.ReadTimeSpan() 186 if err == io.EOF { 187 break 188 } 189 190 if got, exp := key, "cpu,host=A#!~#value"; got != exp { 191 t.Fatalf("key mismatch: got %v, exp %v", got, exp) 192 } 193 194 for _, tr := range ts.Ranges { 195 if got, exp := tr.Max, int64(2); got != exp { 196 t.Fatalf("min time not filtered: got %v, exp %v", got, exp) 197 } 198 } 199 200 count++ 201 } 202 203 if got, exp := count, 1; got != exp { 204 t.Fatalf("count mismatch: got %v, exp %v", got, exp) 205 } 206} 207 208func TestDigest_KeyFilter(t *testing.T) { 209 dir := MustTempDir() 210 dataDir := filepath.Join(dir, "data") 211 if err := os.Mkdir(dataDir, 0755); err != nil { 212 t.Fatalf("create data dir: %v", err) 213 } 214 215 a1 := tsm1.NewValue(1, 1.1) 216 writes := map[string][]tsm1.Value{ 217 "cpu,host=A#!~#value": []tsm1.Value{a1}, 218 } 219 MustWriteTSM(dir, 1, writes) 220 221 a2 := tsm1.NewValue(2, 2.1) 222 writes = map[string][]tsm1.Value{ 223 "cpu,host=B#!~#value": []tsm1.Value{a2}, 224 } 225 MustWriteTSM(dir, 2, writes) 226 227 a3 := tsm1.NewValue(3, 3.1) 228 writes = map[string][]tsm1.Value{ 229 "cpu,host=C#!~#value": []tsm1.Value{a3}, 230 } 231 MustWriteTSM(dir, 3, writes) 232 233 files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", tsm1.TSMFileExtension))) 234 if err != nil { 235 t.Fatal(err) 236 } 237 238 df := MustTempFile(dir) 239 240 if err := tsm1.DigestWithOptions(dir, files, tsm1.DigestOptions{ 241 MinKey: []byte("cpu,host=B#!~#value"), 242 MaxKey: []byte("cpu,host=B#!~#value")}, df); err != nil { 243 t.Fatalf("digest error: %v", err) 244 } 245 246 df, err = os.Open(df.Name()) 247 if err != nil { 248 t.Fatalf("open error: %v", err) 249 } 250 251 r, err := tsm1.NewDigestReader(df) 252 if err != nil { 253 t.Fatalf("NewDigestReader error: %v", err) 254 } 255 defer r.Close() 256 257 mfest, err := r.ReadManifest() 258 if err != nil { 259 t.Fatal(err) 260 } 261 262 if len(mfest.Entries) != 3 { 263 t.Fatalf("exp: 3, got: %d", len(mfest.Entries)) 264 } 265 266 var count int 267 for { 268 key, _, err := r.ReadTimeSpan() 269 if err == io.EOF { 270 break 271 } 272 273 if got, exp := key, "cpu,host=B#!~#value"; got != exp { 274 t.Fatalf("key mismatch: got %v, exp %v", got, exp) 275 } 276 277 count++ 278 } 279 280 if got, exp := count, 1; got != exp { 281 t.Fatalf("count mismatch: got %v, exp %v", got, exp) 282 } 283} 284 285func TestDigest_Manifest(t *testing.T) { 286 // Create temp directory to hold test files. 287 dir := MustTempDir() 288 defer os.RemoveAll(dir) 289 290 digestFile := filepath.Join(dir, tsm1.DigestFilename) 291 292 // Create a point to write to the tsm files. 293 a1 := tsm1.NewValue(1, 1.1) 294 writes := map[string][]tsm1.Value{ 295 "cpu,host=A#!~#value": []tsm1.Value{a1}, 296 } 297 298 // Write a few tsm files. 299 var files []string 300 gen := 1 301 for ; gen < 4; gen++ { 302 name := MustWriteTSM(dir, gen, writes) 303 files = append(files, name) 304 } 305 306 // Generate a manifest. 307 mfest, err := tsm1.NewDigestManifest(dir, files) 308 if err != nil { 309 t.Fatal(err) 310 } 311 312 // Make sure manifest contains only the expected files. 313 var got []string 314 for _, e := range mfest.Entries { 315 got = append(got, e.Filename) 316 } 317 318 sort.StringSlice(files).Sort() 319 sort.StringSlice(got).Sort() 320 321 if !reflect.DeepEqual(files, got) { 322 t.Fatalf("exp: %v, got: %v", files, got) 323 } 324 325 // Write a digest of the files. 326 df := MustCreate(digestFile) 327 if err := tsm1.Digest(dir, files, df); err != nil { 328 t.Fatalf("digest error: %v", err) 329 } 330 331 // Helper func to read manifest from a digest. 332 readManifest := func(name string) *tsm1.DigestManifest { 333 t.Helper() 334 335 df, err = os.Open(df.Name()) 336 if err != nil { 337 t.Fatal(err) 338 } 339 340 r, err := tsm1.NewDigestReader(df) 341 if err != nil { 342 t.Fatal(err) 343 } 344 345 mfest, err := r.ReadManifest() 346 if err != nil { 347 t.Fatal(err) 348 } 349 350 if err := r.Close(); err != nil { 351 t.Fatal(err) 352 } 353 354 return mfest 355 } 356 357 // Read the manifest from the digest. 358 mfest2 := readManifest(df.Name()) 359 360 // Make sure the manifest read from the digest on disk is correct. 361 if !reflect.DeepEqual(mfest, mfest2) { 362 t.Fatalf("invalid manifest:\nexp: %v\ngot: %v", mfest, mfest2) 363 } 364 365 // Write an extra tsm file that shouldn't be included in the manifest. 366 extra := MustWriteTSM(dir, gen, writes) 367 368 // Re-generate manifest. 369 mfest, err = tsm1.NewDigestManifest(dir, files) 370 if err != nil { 371 t.Fatal(err) 372 } 373 374 // Make sure manifest contains only the expected files. 375 got = got[:0] 376 for _, e := range mfest.Entries { 377 if e.Filename == extra { 378 t.Fatal("extra file in shard directory should not be in digest manifest") 379 } 380 got = append(got, e.Filename) 381 } 382 383 sort.StringSlice(got).Sort() 384 385 if !reflect.DeepEqual(files, got) { 386 t.Fatalf("exp: %v, got: %v", files, got) 387 } 388 389 // Re-generate digest and make sure it does not include the extra tsm file. 390 df = MustCreate(digestFile) 391 if err := tsm1.Digest(dir, files, df); err != nil { 392 t.Fatalf("digest error: %v", err) 393 } 394 395 // Read the manifest from the new digest. 396 mfest2 = readManifest(df.Name()) 397 398 // Make sure the manifest read from the digest on disk is correct. 399 if !reflect.DeepEqual(mfest, mfest2) { 400 t.Fatalf("invalid manifest:\nexp: %v\ngot: %v", mfest, mfest2) 401 } 402 403 // Make sure the digest is fresh. 404 digest, err := os.Stat(df.Name()) 405 if err != nil { 406 t.Fatal(err) 407 } 408 409 fresh, reason := tsm1.DigestFresh(dir, files, digest.ModTime()) 410 if !fresh { 411 t.Fatalf("digest is stale: reason=%s", reason) 412 } 413 414 // Test that digest is stale if shard time is newer than digest time. 415 fresh, _ = tsm1.DigestFresh(dir, files, digest.ModTime().Add(1)) 416 if fresh { 417 t.Fatalf("digest is fresh") 418 } 419 420 // Test that digest is stale if a new tsm file has been written by the engine. 421 allfiles := append(files, extra) 422 fresh, _ = tsm1.DigestFresh(dir, allfiles, digest.ModTime()) 423 if fresh { 424 t.Fatalf("digest is fresh") 425 } 426 427 // Open one of the tsm files and write data to it. 428 f, err := os.OpenFile(files[0], os.O_WRONLY|os.O_APPEND, 0666) 429 if err != nil { 430 t.Fatal(err) 431 } 432 433 if _, err := f.WriteString("some data"); err != nil { 434 t.Fatal(err) 435 } 436 437 if err := f.Close(); err != nil { 438 t.Fatal(err) 439 } 440 441 // Test that digest is stale if a tsm file is changed. 442 fresh, _ = tsm1.DigestFresh(dir, files, digest.ModTime()) 443 if fresh { 444 t.Fatalf("digest is fresh") 445 } 446 447 // Delete a tsm file. 448 if err := os.Remove(files[0]); err != nil { 449 t.Fatal(err) 450 } 451 452 // Test that digest is stale if a tsm file is missing on disk. 453 fresh, _ = tsm1.DigestFresh(dir, files, digest.ModTime()) 454 if fresh { 455 t.Fatalf("digest is fresh") 456 } 457 458 // Delete the entire shard directory 459 if err := os.RemoveAll(dir); err != nil { 460 t.Fatal(err) 461 } 462 463 // Test that digest is stale if the entire shard directory is missing. 464 fresh, _ = tsm1.DigestFresh(dir, files, digest.ModTime()) 465 if fresh { 466 t.Fatalf("digest is fresh") 467 } 468} 469 470func MustCreate(path string) *os.File { 471 f, err := os.Create(path) 472 if err != nil { 473 panic(err) 474 } 475 return f 476} 477