1/* 2Copyright 2014 The Perkeep Authors 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package blobpacked 18 19import ( 20 "archive/zip" 21 "bytes" 22 "context" 23 "encoding/json" 24 "errors" 25 "fmt" 26 "io" 27 "io/ioutil" 28 "math/rand" 29 "runtime" 30 "sort" 31 "strconv" 32 "strings" 33 "sync" 34 "testing" 35 "time" 36 37 "perkeep.org/internal/testhooks" 38 "perkeep.org/pkg/blob" 39 "perkeep.org/pkg/blobserver" 40 "perkeep.org/pkg/blobserver/storagetest" 41 "perkeep.org/pkg/constants" 42 "perkeep.org/pkg/schema" 43 "perkeep.org/pkg/sorted" 44 "perkeep.org/pkg/test" 45 46 "go4.org/syncutil" 47) 48 49func init() { 50 testhooks.SetUseSHA1(true) 51} 52 53const debug = false 54 55var ctxbg = context.Background() 56 57func TestStorage(t *testing.T) { 58 storagetest.Test(t, func(t *testing.T) (sto blobserver.Storage, cleanup func()) { 59 s := &storage{ 60 small: new(test.Fetcher), 61 large: new(test.Fetcher), 62 meta: sorted.NewMemoryKeyValue(), 63 log: test.NewLogger(t, "blobpacked: "), 64 } 65 s.init() 66 return s, func() {} 67 }) 68} 69 70func TestStorageNoSmallSubfetch(t *testing.T) { 71 storagetest.Test(t, func(t *testing.T) (sto blobserver.Storage, cleanup func()) { 72 s := &storage{ 73 // We need to hide SubFetcher, to test *storage's SubFetch, as it delegates 74 // to the underlying SubFetcher, if small implements that interface. 75 small: hideSubFetcher(new(test.Fetcher)), 76 large: new(test.Fetcher), 77 meta: sorted.NewMemoryKeyValue(), 78 log: test.NewLogger(t, "blobpacked: "), 79 } 80 s.init() 81 return s, func() {} 82 }) 83} 84 85func hideSubFetcher(sto blobserver.Storage) blobserver.Storage { 86 if _, ok := sto.(blob.SubFetcher); ok { 87 return struct{ blobserver.Storage }{sto} 88 } 89 return sto 90} 91 92func TestParseMetaRow(t *testing.T) { 93 cases := []struct { 94 in string 95 want meta 96 err bool 97 }{ 98 {in: "123 sx", err: true}, 99 {in: "-123 s", err: true}, 100 {in: "", err: true}, 101 {in: "1 ", err: true}, 102 {in: " ", err: true}, 103 {in: "123 x", err: true}, 104 {in: "123 l", err: true}, 105 {in: "123 sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15", err: true}, 106 {in: "123 notaref 12", err: true}, 107 {in: "123 sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15 42 extra", err: true}, 108 {in: "123 sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15 42 ", err: true}, 109 {in: "123 sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15 42", want: meta{ 110 exists: true, 111 size: 123, 112 largeRef: blob.MustParse("sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15"), 113 largeOff: 42, 114 }}, 115 } 116 for _, tt := range cases { 117 got, err := parseMetaRow([]byte(tt.in)) 118 if (err != nil) != tt.err { 119 t.Errorf("For %q error = %v; want-err? = %v", tt.in, err, tt.err) 120 continue 121 } 122 if tt.err { 123 continue 124 } 125 if got != tt.want { 126 t.Errorf("For %q, parseMetaRow = %+v; want %+v", tt.in, got, tt.want) 127 } 128 } 129} 130 131func wantNumLargeBlobs(want int) func(*packTest) { 132 return func(pt *packTest) { pt.wantLargeBlobs = want } 133} 134 135func wantNumSmallBlobs(want int) func(*packTest) { 136 return func(pt *packTest) { pt.wantSmallBlobs = want } 137} 138 139func okayWithoutMeta(refStr string) func(*packTest) { 140 return func(pt *packTest) { 141 if pt.okayNoMeta == nil { 142 pt.okayNoMeta = map[blob.Ref]bool{} 143 } 144 pt.okayNoMeta[blob.MustParse(refStr)] = true 145 } 146} 147 148func randBytesSrc(n int, src int64) []byte { 149 r := rand.New(rand.NewSource(src)) 150 s := make([]byte, n) 151 for i := range s { 152 s[i] = byte(r.Int63()) 153 } 154 return s 155} 156 157func randBytes(n int) []byte { 158 return randBytesSrc(n, 42) 159} 160 161func TestPackNormal(t *testing.T) { 162 const fileSize = 5 << 20 163 const fileName = "foo.dat" 164 fileContents := randBytes(fileSize) 165 166 hash := blob.NewHash() 167 hash.Write(fileContents) 168 wholeRef := blob.RefFromHash(hash) 169 170 pt := testPack(t, 171 func(sto blobserver.Storage) error { 172 _, err := schema.WriteFileFromReader(ctxbg, sto, fileName, bytes.NewReader(fileContents)) 173 return err 174 }, 175 wantNumLargeBlobs(1), 176 wantNumSmallBlobs(0), 177 ) 178 // And verify we can read it back out. 179 pt.testOpenWholeRef(t, wholeRef, fileSize) 180} 181 182func TestPackNoDelete(t *testing.T) { 183 const fileSize = 1 << 20 184 const fileName = "foo.dat" 185 fileContents := randBytes(fileSize) 186 testPack(t, 187 func(sto blobserver.Storage) error { 188 _, err := schema.WriteFileFromReader(ctxbg, sto, fileName, bytes.NewReader(fileContents)) 189 return err 190 }, 191 func(pt *packTest) { pt.sto.skipDelete = true }, 192 wantNumLargeBlobs(1), 193 wantNumSmallBlobs(14), // empirically 194 ) 195} 196 197func TestPackLarge(t *testing.T) { 198 if testing.Short() { 199 t.Skip("skipping in short mode") 200 } 201 const fileSize = 17 << 20 // more than 16 MB, so more than one zip 202 const fileName = "foo.dat" 203 fileContents := randBytes(fileSize) 204 205 hash := blob.NewHash() 206 hash.Write(fileContents) 207 wholeRef := blob.RefFromHash(hash) 208 209 pt := testPack(t, 210 func(sto blobserver.Storage) error { 211 _, err := schema.WriteFileFromReader(ctxbg, sto, fileName, bytes.NewReader(fileContents)) 212 return err 213 }, 214 wantNumLargeBlobs(2), 215 wantNumSmallBlobs(0), 216 ) 217 218 // Gather the "w:*" meta rows we wrote. 219 got := map[string]string{} 220 if err := sorted.Foreach(pt.sto.meta, func(key, value string) error { 221 if strings.HasPrefix(key, "b:") { 222 return nil 223 } 224 got[key] = value 225 return nil 226 }); err != nil { 227 t.Fatal(err) 228 } 229 230 // Verify the two zips are correctly described. 231 232 // There should be one row to say that we have two zip, and 233 // that the overall file is 17MB: 234 keyBase := "w:" + wholeRef.String() 235 if g, w := got[keyBase], "17825792 2"; g != w { 236 t.Fatalf("meta row for key %q = %q; want %q", keyBase, g, w) 237 } 238 239 // ... (and a little helper) ... 240 parseMeta := func(n int) (zipOff, dataOff, dataLen int64) { 241 key := keyBase + ":" + strconv.Itoa(n) 242 v := got[key] 243 f := strings.Fields(v) 244 if len(f) != 4 { 245 t.Fatalf("meta for key %q = %q; expected 4 space-separated fields", key, v) 246 } 247 i64 := func(n int) int64 { 248 i, err := strconv.ParseInt(f[n], 10, 64) 249 if err != nil { 250 t.Fatalf("error parsing int64 %q in field index %d of meta key %q (value %q): %v", f[n], n, key, v, err) 251 } 252 return i 253 } 254 zipOff, dataOff, dataLen = i64(1), i64(2), i64(3) 255 return 256 } 257 258 // And then verify if we have the two "w:<wholeref>:0" and 259 // "w:<wholeref>:1" rows and that they're consistent. 260 z0, d0, l0 := parseMeta(0) 261 z1, d1, l1 := parseMeta(1) 262 if z0 != z1 { 263 t.Errorf("expected zip offset in zip0 and zip1 to match. got %d and %d", z0, z0) 264 } 265 if d0 != 0 { 266 t.Errorf("zip0's data offset = %d; want 0", d0) 267 } 268 if d1 != l0 { 269 t.Errorf("zip1 data offset %d != zip0 data length %d", d1, l0) 270 } 271 if d1+l1 != fileSize { 272 t.Errorf("zip1's offset %d + length %d = %d; want %d (fileSize)", d1, l1, d1+l1, fileSize) 273 } 274 275 // And verify we can read it back out. 276 pt.testOpenWholeRef(t, wholeRef, fileSize) 277} 278 279func countSortedRows(t *testing.T, meta sorted.KeyValue) int { 280 rows := 0 281 if err := sorted.Foreach(meta, func(key, value string) error { 282 rows++ 283 return nil 284 }); err != nil { 285 t.Fatal(err) 286 } 287 return rows 288} 289 290func TestParseZipMetaRow(t *testing.T) { 291 tests := []struct { 292 zm zipMetaInfo 293 wholeRef blob.Ref 294 offset uint64 295 }{ 296 { 297 zm: zipMetaInfo{ 298 zipSize: 16738962, 299 wholeSize: 139639864, 300 dataSize: 16659276, 301 wholeRef: blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"), 302 }, 303 offset: 0, 304 }, 305 { 306 zm: zipMetaInfo{ 307 zipSize: 16739170, 308 wholeSize: 139639864, 309 dataSize: 16670204, 310 wholeRef: blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"), 311 }, 312 offset: 16659276, 313 }, 314 { 315 zm: zipMetaInfo{ 316 zipSize: 16744577, 317 wholeSize: 139639864, 318 dataSize: 16668625, 319 wholeRef: blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"), 320 }, 321 offset: 33329480, 322 }, 323 { 324 zm: zipMetaInfo{ 325 zipSize: 16628223, 326 wholeSize: 139639864, 327 dataSize: 16555478, 328 wholeRef: blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"), 329 }, 330 offset: 49998105, 331 }, 332 { 333 zm: zipMetaInfo{ 334 zipSize: 16735901, 335 wholeSize: 139639864, 336 dataSize: 16661990, 337 wholeRef: blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"), 338 }, 339 offset: 66553583, 340 }, 341 { 342 zm: zipMetaInfo{ 343 zipSize: 16628162, 344 wholeSize: 139639864, 345 dataSize: 16555638, 346 wholeRef: blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"), 347 }, 348 offset: 83215573, 349 }, 350 { 351 zm: zipMetaInfo{ 352 zipSize: 16638400, 353 wholeSize: 139639864, 354 dataSize: 16569680, 355 wholeRef: blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"), 356 }, 357 offset: 99771211, 358 }, 359 { 360 zm: zipMetaInfo{ 361 zipSize: 16731570, 362 wholeSize: 139639864, 363 dataSize: 16665343, 364 wholeRef: blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"), 365 }, 366 offset: 116340891, 367 }, 368 { 369 zm: zipMetaInfo{ 370 zipSize: 6656201, 371 wholeSize: 139639864, 372 dataSize: 6633630, 373 wholeRef: blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"), 374 }, 375 offset: 133006234, 376 }, 377 } 378 for k, tt := range tests { 379 rv := tt.zm.rowValue(tt.offset) 380 got, err := parseZipMetaRow([]byte(rv)) 381 if err != nil { 382 t.Fatal(err) 383 } 384 if tt.zm != got { 385 t.Errorf("for zip %d;\n got: %#v\n want: %#v\n", k, got, tt.zm) 386 } 387 } 388} 389 390func TestReindex(t *testing.T) { 391 if testing.Short() { 392 t.Skip("skipping in short mode") 393 } 394 395 type file struct { 396 size int64 397 name string 398 contents []byte 399 } 400 files := []file{ 401 {17 << 20, "foo.dat", randBytesSrc(17<<20, 42)}, 402 {10 << 20, "bar.dat", randBytesSrc(10<<20, 43)}, 403 {5 << 20, "baz.dat", randBytesSrc(5<<20, 44)}, 404 } 405 406 pt := testPack(t, 407 func(sto blobserver.Storage) error { 408 for _, f := range files { 409 if _, err := schema.WriteFileFromReader(ctxbg, sto, f.name, bytes.NewReader(f.contents)); err != nil { 410 return err 411 } 412 } 413 return nil 414 }, 415 wantNumLargeBlobs(4), 416 wantNumSmallBlobs(0), 417 ) 418 419 // backup the meta that is supposed to be lost/erased. 420 // pt.sto.reindex allocates a new pt.sto.meta, so meta != pt.sto.meta after it is called. 421 meta := pt.sto.meta 422 423 // and build new meta index 424 if err := pt.sto.reindex(context.TODO(), func() (sorted.KeyValue, error) { 425 return sorted.NewMemoryKeyValue(), nil 426 }); err != nil { 427 t.Fatal(err) 428 } 429 430 validBlobKey := func(key, value string) error { 431 if !strings.HasPrefix(key, "b:") { 432 return errors.New("not a blob meta key") 433 } 434 wantRef, ok := blob.Parse(key[2:]) 435 if !ok { 436 return errors.New("bogus blobref in key") 437 } 438 m, err := parseMetaRow([]byte(value)) 439 if err != nil { 440 return err 441 } 442 rc, err := pt.large.SubFetch(ctxbg, m.largeRef, int64(m.largeOff), int64(m.size)) 443 if err != nil { 444 return err 445 } 446 defer rc.Close() 447 h := wantRef.Hash() 448 n, err := io.Copy(h, rc) 449 if err != nil { 450 return err 451 } 452 453 if !wantRef.HashMatches(h) { 454 return errors.New("content doesn't match") 455 } 456 if n != int64(m.size) { 457 return errors.New("size doesn't match") 458 } 459 return nil 460 } 461 462 // check that new meta is identical to "lost" one 463 newRows := 0 464 if err := sorted.Foreach(pt.sto.meta, func(key, newValue string) error { 465 oldValue, err := meta.Get(key) 466 if err != nil { 467 t.Fatalf("Could not get value for %v in old meta: %v", key, err) 468 } 469 newRows++ 470 // Exact match is fine. 471 if oldValue == newValue { 472 return nil 473 } 474 // If it differs, it should at least be correct. (blob metadata 475 // can now point to different packed zips, depending on sorting) 476 err = validBlobKey(key, newValue) 477 if err == nil { 478 return nil 479 } 480 t.Errorf("Reindexing error: for key %v: %v\n got: %q\nwant: %q", key, err, newValue, oldValue) 481 return nil // keep enumerating, regardless of errors 482 }); err != nil { 483 t.Fatal(err) 484 } 485 486 // make sure they have the same number of entries too, to be sure that the reindexing 487 // did not miss entries that the old meta had. 488 oldRows := countSortedRows(t, meta) 489 if oldRows != newRows { 490 t.Fatalf("index number of entries mismatch: got %d entries in new index, wanted %d (as in index before reindexing)", newRows, oldRows) 491 } 492 493 // And verify we can read one of the files back out. 494 hash := blob.NewHash() 495 hash.Write(files[0].contents) 496 pt.testOpenWholeRef(t, blob.RefFromHash(hash), files[0].size) 497 498 // Specifically check the z: rows. 499 zrows := []string{ 500 "z:sha1-41e7665e4e3f491790121fb0440b4f685b3386cb | 16762318 sha1-f6bcda1d4111f45ca785499ae9b3bae019608f65 17825792 0 16709479", 501 "z:sha1-60e61eef95c38e15e8b6422cdaa8a95ad6c38a8b | 1120477 sha1-f6bcda1d4111f45ca785499ae9b3bae019608f65 17825792 16709479 1116313", 502 "z:sha1-9655da8b87e7ccfd804edf1c5967219e2e1ae556 | 5260755 sha1-28aa3334333bb57610ff397432dad6d2c41dc520 5242880 0 5242880", 503 "z:sha1-bc317462c29d9b70891538b7491ac420334d7ef8 | 10516226 sha1-87cdaac04cb9a37c0378970e8ab58f09f22a9907 10485760 0 10485760", 504 } 505 it := pt.sto.meta.Find(zipMetaPrefix, zipMetaPrefixLimit) 506 i := 0 507 for it.Next() { 508 got := it.Key() + " | " + it.Value() 509 if zrows[i] != got { 510 t.Errorf("for row %d;\n got: %v\n want: %v\n", i, got, zrows[i]) 511 } 512 i++ 513 } 514 it.Close() 515 516 recoMode, err := pt.sto.checkLargeIntegrity() 517 if err != nil { 518 t.Fatal(err) 519 } 520 if recoMode != NoRecovery { 521 t.Fatalf("recovery mode after integrity check: %v", recoMode) 522 } 523} 524 525func (pt *packTest) testOpenWholeRef(t *testing.T, wholeRef blob.Ref, wantSize int64) { 526 rc, gotSize, err := pt.sto.OpenWholeRef(wholeRef, 0) 527 if err != nil { 528 t.Errorf("OpenWholeRef = %v", err) 529 return 530 } 531 defer rc.Close() 532 if gotSize != wantSize { 533 t.Errorf("OpenWholeRef size = %v; want %v", gotSize, wantSize) 534 return 535 } 536 h := blob.NewHash() 537 n, err := io.Copy(h, rc) 538 if err != nil { 539 t.Errorf("OpenWholeRef read error: %v", err) 540 return 541 } 542 if n != wantSize { 543 t.Errorf("OpenWholeRef read %v bytes; want %v", n, wantSize) 544 return 545 } 546 gotRef := blob.RefFromHash(h) 547 if gotRef != wholeRef { 548 t.Errorf("OpenWholeRef read contents = %v; want %v", gotRef, wholeRef) 549 } 550} 551 552func TestPackTwoIdenticalfiles(t *testing.T) { 553 const fileSize = 1 << 20 554 fileContents := randBytes(fileSize) 555 testPack(t, 556 func(sto blobserver.Storage) (err error) { 557 if _, err = schema.WriteFileFromReader(ctxbg, sto, "a.txt", bytes.NewReader(fileContents)); err != nil { 558 return 559 } 560 if _, err = schema.WriteFileFromReader(ctxbg, sto, "b.txt", bytes.NewReader(fileContents)); err != nil { 561 return 562 } 563 return 564 }, 565 func(pt *packTest) { pt.sto.packGate = syncutil.NewGate(1) }, // one pack at a time 566 wantNumLargeBlobs(1), 567 wantNumSmallBlobs(1), // just the "b.txt" file schema blob 568 okayWithoutMeta("sha1-7912d1f93942e84cb7ebd6bd6c83b7c152dc102b"), 569 ) 570} 571 572// packTest is the state kept while running func testPack. 573type packTest struct { 574 sto *storage 575 logical, small, large *test.Fetcher 576 577 wantLargeBlobs interface{} // nil means disabled, else int 578 wantSmallBlobs interface{} // nil means disabled, else int 579 580 okayNoMeta map[blob.Ref]bool 581} 582 583func testPack(t *testing.T, 584 write func(sto blobserver.Storage) error, 585 checks ...func(*packTest), 586) *packTest { 587 ctx, cancel := context.WithCancel(context.TODO()) 588 defer cancel() 589 590 logical := new(test.Fetcher) 591 small, large := new(test.Fetcher), new(test.Fetcher) 592 pt := &packTest{ 593 logical: logical, 594 small: small, 595 large: large, 596 } 597 // Figure out the logical baseline blobs we'll later expect in the packed storage. 598 if err := write(logical); err != nil { 599 t.Fatal(err) 600 } 601 t.Logf("items in logical storage: %d", logical.NumBlobs()) 602 603 pt.sto = &storage{ 604 small: small, 605 large: large, 606 meta: sorted.NewMemoryKeyValue(), 607 log: test.NewLogger(t, "blobpacked: "), 608 } 609 pt.sto.init() 610 611 for _, setOpt := range checks { 612 setOpt(pt) 613 } 614 615 if err := write(pt.sto); err != nil { 616 t.Fatal(err) 617 } 618 619 t.Logf("items in small: %v", small.NumBlobs()) 620 t.Logf("items in large: %v", large.NumBlobs()) 621 622 if want, ok := pt.wantLargeBlobs.(int); ok && want != large.NumBlobs() { 623 t.Fatalf("num large blobs = %d; want %d", large.NumBlobs(), want) 624 } 625 if want, ok := pt.wantSmallBlobs.(int); ok && want != small.NumBlobs() { 626 t.Fatalf("num small blobs = %d; want %d", small.NumBlobs(), want) 627 } 628 629 var zipRefs []blob.Ref 630 var zipSeen = map[blob.Ref]bool{} 631 blobserver.EnumerateAll(ctx, large, func(sb blob.SizedRef) error { 632 zipRefs = append(zipRefs, sb.Ref) 633 zipSeen[sb.Ref] = true 634 return nil 635 }) 636 if len(zipRefs) != large.NumBlobs() { 637 t.Fatalf("Enumerated only %d zip files; expected %d", len(zipRefs), large.NumBlobs()) 638 } 639 640 bytesOfZip := map[blob.Ref][]byte{} 641 for _, zipRef := range zipRefs { 642 rc, _, err := large.Fetch(ctxbg, zipRef) 643 if err != nil { 644 t.Fatal(err) 645 } 646 zipBytes, err := ioutil.ReadAll(rc) 647 rc.Close() 648 if err != nil { 649 t.Fatalf("Error slurping %s: %v", zipRef, err) 650 } 651 if len(zipBytes) > constants.MaxBlobSize { 652 t.Fatalf("zip is too large: %d > max %d", len(zipBytes), constants.MaxBlobSize) 653 } 654 bytesOfZip[zipRef] = zipBytes 655 zr, err := zip.NewReader(bytes.NewReader(zipBytes), int64(len(zipBytes))) 656 if err != nil { 657 t.Fatalf("Error reading resulting zip file: %v", err) 658 } 659 if len(zr.File) == 0 { 660 t.Fatal("zip is empty") 661 } 662 nameSeen := map[string]bool{} 663 for i, zf := range zr.File { 664 if nameSeen[zf.Name] { 665 t.Errorf("duplicate name %q seen", zf.Name) 666 } 667 nameSeen[zf.Name] = true 668 t.Logf("zip[%d] size %d, %v", i, zf.UncompressedSize64, zf.Name) 669 } 670 mfr, err := zr.File[len(zr.File)-1].Open() 671 if err != nil { 672 t.Fatalf("Error opening manifest JSON: %v", err) 673 } 674 maniJSON, err := ioutil.ReadAll(mfr) 675 if err != nil { 676 t.Fatalf("Error reading manifest JSON: %v", err) 677 } 678 var mf Manifest 679 if err := json.Unmarshal(maniJSON, &mf); err != nil { 680 t.Fatalf("invalid JSON: %v", err) 681 } 682 683 // Verify each chunk described in the manifest: 684 baseOffset, err := zr.File[0].DataOffset() 685 if err != nil { 686 t.Fatal(err) 687 } 688 for _, bo := range mf.DataBlobs { 689 h := bo.Ref.Hash() 690 h.Write(zipBytes[baseOffset+bo.Offset : baseOffset+bo.Offset+int64(bo.Size)]) 691 if !bo.Ref.HashMatches(h) { 692 t.Errorf("blob %+v didn't describe the actual data in the zip", bo) 693 } 694 } 695 if debug { 696 t.Logf("Manifest: %s", maniJSON) 697 } 698 } 699 700 // Verify that each chunk in the logical mapping is in the meta. 701 logBlobs := 0 702 if err := blobserver.EnumerateAll(ctx, logical, func(sb blob.SizedRef) error { 703 logBlobs++ 704 v, err := pt.sto.meta.Get(blobMetaPrefix + sb.Ref.String()) 705 if err == sorted.ErrNotFound && pt.okayNoMeta[sb.Ref] { 706 return nil 707 } 708 if err != nil { 709 return fmt.Errorf("error looking up logical blob %v in meta: %v", sb.Ref, err) 710 } 711 m, err := parseMetaRow([]byte(v)) 712 if err != nil { 713 return fmt.Errorf("error parsing logical blob %v meta %q: %v", sb.Ref, v, err) 714 } 715 if !m.exists || m.size != sb.Size || !zipSeen[m.largeRef] { 716 return fmt.Errorf("logical blob %v = %+v; want in zip", sb.Ref, m) 717 } 718 h := sb.Ref.Hash() 719 h.Write(bytesOfZip[m.largeRef][m.largeOff : m.largeOff+sb.Size]) 720 if !sb.Ref.HashMatches(h) { 721 t.Errorf("blob %v not found matching in zip", sb.Ref) 722 } 723 return nil 724 }); err != nil { 725 t.Fatal(err) 726 } 727 if logBlobs != logical.NumBlobs() { 728 t.Error("enumerate over logical blobs didn't work?") 729 } 730 731 // TODO, more tests: 732 // -- like TestPackTwoIdenticalfiles, but instead of testing 733 // no dup for 100% identical file bytes, test that uploading a 734 // 49% identical one does not denormalize and repack. 735 // -- test StreamBlobs in all its various flavours, and recovering from stream blobs. 736 // -- overflowing the 16MB chunk size with huge initial chunks 737 return pt 738} 739 740// see if storage proxies through to small for Fetch, Stat, and Enumerate. 741func TestSmallFallback(t *testing.T) { 742 small := new(test.Fetcher) 743 s := &storage{ 744 small: small, 745 large: new(test.Fetcher), 746 meta: sorted.NewMemoryKeyValue(), 747 log: test.NewLogger(t, "blobpacked: "), 748 } 749 s.init() 750 b1 := &test.Blob{"foo"} 751 b1.MustUpload(t, small) 752 wantSB := b1.SizedRef() 753 754 // Fetch 755 rc, _, err := s.Fetch(ctxbg, b1.BlobRef()) 756 if err != nil { 757 t.Errorf("failed to Get blob: %v", err) 758 } else { 759 rc.Close() 760 } 761 762 // Stat. 763 sb, err := blobserver.StatBlob(ctxbg, s, b1.BlobRef()) 764 if err != nil { 765 t.Errorf("failed to Stat blob: %v", err) 766 } else if sb != wantSB { 767 t.Errorf("Stat = %v; want %v", sb, wantSB) 768 } 769 770 // Enumerate 771 saw := false 772 ctx, cancel := context.WithCancel(context.TODO()) 773 defer cancel() 774 if err := blobserver.EnumerateAll(ctx, s, func(sb blob.SizedRef) error { 775 if sb != wantSB { 776 return fmt.Errorf("saw blob %v; want %v", sb, wantSB) 777 } 778 saw = true 779 return nil 780 }); err != nil { 781 t.Errorf("EnuerateAll: %v", err) 782 } 783 if !saw { 784 t.Error("didn't see blob in Enumerate") 785 } 786} 787 788func TestZ_LeakCheck(t *testing.T) { 789 if testing.Short() { 790 return 791 } 792 time.Sleep(50 * time.Millisecond) // let goroutines schedule & die off 793 buf := make([]byte, 1<<20) 794 buf = buf[:runtime.Stack(buf, true)] 795 n := bytes.Count(buf, []byte("[chan receive]:")) 796 if n > 1 { 797 t.Errorf("%d goroutines in chan receive: %s", n, buf) 798 } 799} 800 801func TestForeachZipBlob(t *testing.T) { 802 const fileSize = 2 << 20 803 const fileName = "foo.dat" 804 fileContents := randBytes(fileSize) 805 806 ctx, cancel := context.WithCancel(context.TODO()) 807 defer cancel() 808 809 pt := testPack(t, 810 func(sto blobserver.Storage) error { 811 _, err := schema.WriteFileFromReader(ctxbg, sto, fileName, bytes.NewReader(fileContents)) 812 return err 813 }, 814 wantNumLargeBlobs(1), 815 wantNumSmallBlobs(0), 816 ) 817 818 zipBlob, err := singleBlob(pt.large) 819 if err != nil { 820 t.Fatal(err) 821 } 822 zipBytes := slurpBlob(t, pt.large, zipBlob.Ref) 823 zipSize := len(zipBytes) 824 825 all := map[blob.Ref]blob.SizedRef{} 826 if err := blobserver.EnumerateAll(ctx, pt.logical, func(sb blob.SizedRef) error { 827 all[sb.Ref] = sb 828 return nil 829 }); err != nil { 830 t.Fatal(err) 831 } 832 foreachSaw := 0 833 blobSizeSum := 0 834 if err := pt.sto.foreachZipBlob(ctxbg, zipBlob.Ref, func(bap BlobAndPos) error { 835 foreachSaw++ 836 blobSizeSum += int(bap.Size) 837 want, ok := all[bap.Ref] 838 if !ok { 839 t.Errorf("unwanted blob ref returned from foreachZipBlob: %v", bap.Ref) 840 return nil 841 } 842 delete(all, bap.Ref) 843 if want.Size != bap.Size { 844 t.Errorf("for %v, foreachZipBlob size = %d; want %d", bap.Ref, bap.Size, want.Size) 845 return nil 846 } 847 848 // Verify the offset. 849 h := bap.Ref.Hash() 850 h.Write(zipBytes[bap.Offset : bap.Offset+int64(bap.Size)]) 851 if !bap.Ref.HashMatches(h) { 852 return fmt.Errorf("foreachZipBlob returned blob %v at offset %d that failed validation", bap.Ref, bap.Offset) 853 } 854 855 return nil 856 }); err != nil { 857 t.Fatal(err) 858 } 859 860 t.Logf("foreachZipBlob enumerated %d blobs", foreachSaw) 861 if len(all) > 0 { 862 t.Errorf("foreachZipBlob forgot to enumerate %d blobs: %v", len(all), all) 863 } 864 // Calculate per-blobref zip overhead (zip file headers/TOC/manifest file, etc) 865 zipOverhead := zipSize - blobSizeSum 866 t.Logf("zip fixed overhead = %d bytes, for %d blobs (%d bytes each)", zipOverhead, foreachSaw, zipOverhead/foreachSaw) 867} 868 869// singleBlob assumes that sto contains a single blob and returns it. 870// If there are more or fewer than one blob, it's an error. 871func singleBlob(sto blobserver.BlobEnumerator) (ret blob.SizedRef, err error) { 872 ctx, cancel := context.WithCancel(context.TODO()) 873 defer cancel() 874 875 n := 0 876 if err = blobserver.EnumerateAll(ctx, sto, func(sb blob.SizedRef) error { 877 ret = sb 878 n++ 879 return nil 880 }); err != nil { 881 return blob.SizedRef{}, err 882 } 883 if n != 1 { 884 return blob.SizedRef{}, fmt.Errorf("saw %d blobs; want 1", n) 885 } 886 return 887} 888 889func TestRemoveBlobs(t *testing.T) { 890 ctx, cancel := context.WithCancel(context.TODO()) 891 defer cancel() 892 893 // The basic small cases are handled via storagetest in TestStorage, 894 // so this only tests removing packed blobs. 895 896 small := new(test.Fetcher) 897 large := new(test.Fetcher) 898 sto := &storage{ 899 small: small, 900 large: large, 901 meta: sorted.NewMemoryKeyValue(), 902 log: test.NewLogger(t, "blobpacked: "), 903 } 904 sto.init() 905 906 const fileSize = 1 << 20 907 fileContents := randBytes(fileSize) 908 if _, err := schema.WriteFileFromReader(ctxbg, sto, "foo.dat", bytes.NewReader(fileContents)); err != nil { 909 t.Fatal(err) 910 } 911 if small.NumBlobs() != 0 || large.NumBlobs() == 0 { 912 t.Fatalf("small, large counts == %d, %d; want 0, non-zero", small.NumBlobs(), large.NumBlobs()) 913 } 914 var all []blob.SizedRef 915 if err := blobserver.EnumerateAll(ctx, sto, func(sb blob.SizedRef) error { 916 all = append(all, sb) 917 return nil 918 }); err != nil { 919 t.Fatal(err) 920 } 921 922 // Find the zip 923 zipBlob, err := singleBlob(sto.large) 924 if err != nil { 925 t.Fatalf("failed to find packed zip: %v", err) 926 } 927 928 // The zip file is in use, so verify we can't delete it. 929 if err := sto.deleteZipPack(ctxbg, zipBlob.Ref); err == nil { 930 t.Fatalf("zip pack blob deleted but it should not have been allowed") 931 } 932 933 // Delete everything 934 for len(all) > 0 { 935 del := all[0].Ref 936 all = all[1:] 937 if err := sto.RemoveBlobs(ctx, []blob.Ref{del}); err != nil { 938 t.Fatalf("RemoveBlobs: %v", err) 939 } 940 if err := storagetest.CheckEnumerate(sto, all); err != nil { 941 t.Fatalf("After deleting %v, %v", del, err) 942 } 943 } 944 945 dRows := func() (n int) { 946 if err := sorted.ForeachInRange(sto.meta, "d:", "", func(key, value string) error { 947 if strings.HasPrefix(key, "d:") { 948 n++ 949 } 950 return nil 951 }); err != nil { 952 t.Fatalf("meta iteration error: %v", err) 953 } 954 return 955 } 956 957 if n := dRows(); n == 0 { 958 t.Fatalf("expected a 'd:' row after deletes") 959 } 960 961 // TODO: test the background pack-deleter loop? figure out its design first. 962 if err := sto.deleteZipPack(ctxbg, zipBlob.Ref); err != nil { 963 t.Errorf("error deleting zip %v: %v", zipBlob.Ref, err) 964 } 965 if n := dRows(); n != 0 { 966 t.Errorf("expected the 'd:' row to be deleted") 967 } 968} 969 970func setIntTemporarily(i *int, tempVal int) (restore func()) { 971 old := *i 972 *i = tempVal 973 return func() { *i = old } 974} 975 976func TestPackerBoundarySplits(t *testing.T) { 977 if testing.Short() { 978 t.Skip("skipping slow test") 979 } 980 // Test a file of three chunk sizes, totalling near the 16 MB 981 // boundary: 982 // - 1st chunk is 6 MB. ("blobA") 983 // - 2nd chunk is 6 MB. ("blobB") 984 // - 3rd chunk ("blobC") is binary-searched (up to 4MB) to find 985 // which size causes the packer to write two zip files. 986 987 // During the test we set zip overhead boundaries to 0, to 988 // force the test to into its pathological misprediction code paths, 989 // where it needs to back up and rewrite the zip with one part less. 990 // That's why the test starts with two zip files: so there's at 991 // least one that can be removed to make room. 992 defer setIntTemporarily(&zipPerEntryOverhead, 0)() 993 994 const sizeAB = 12 << 20 995 const maxBlobSize = 16 << 20 996 bytesAB := randBytes(sizeAB) 997 blobA := &test.Blob{string(bytesAB[:sizeAB/2])} 998 blobB := &test.Blob{string(bytesAB[sizeAB/2:])} 999 refA := blobA.BlobRef() 1000 refB := blobB.BlobRef() 1001 bytesCFull := randBytes(maxBlobSize - sizeAB) // will be sliced down 1002 1003 // Mechanism to verify we hit the back-up code path: 1004 var ( 1005 mu sync.Mutex 1006 sawTruncate blob.Ref 1007 stoppedBeforeOverflow bool 1008 ) 1009 testHookSawTruncate = func(after blob.Ref) { 1010 if after != refB { 1011 t.Errorf("unexpected truncate point %v", after) 1012 } 1013 mu.Lock() 1014 defer mu.Unlock() 1015 sawTruncate = after 1016 } 1017 testHookStopBeforeOverflowing = func() { 1018 mu.Lock() 1019 defer mu.Unlock() 1020 stoppedBeforeOverflow = true 1021 } 1022 defer func() { 1023 testHookSawTruncate = nil 1024 testHookStopBeforeOverflowing = nil 1025 }() 1026 1027 generatesTwoZips := func(sizeC int) (ret bool) { 1028 large := new(test.Fetcher) 1029 s := &storage{ 1030 small: new(test.Fetcher), 1031 large: large, 1032 meta: sorted.NewMemoryKeyValue(), 1033 log: test.NewLogger(t, "blobpacked: ", 1034 // Ignore these phrases: 1035 "Packing file ", 1036 "Packed file ", 1037 ), 1038 } 1039 s.init() 1040 1041 // Upload first two chunks 1042 blobA.MustUpload(t, s) 1043 blobB.MustUpload(t, s) 1044 1045 // Upload second chunk 1046 bytesC := bytesCFull[:sizeC] 1047 h := blob.NewHash() 1048 h.Write(bytesC) 1049 refC := blob.RefFromHash(h) 1050 _, err := s.ReceiveBlob(ctxbg, refC, bytes.NewReader(bytesC)) 1051 if err != nil { 1052 t.Fatal(err) 1053 } 1054 1055 // Upload the file schema blob. 1056 m := schema.NewFileMap("foo.dat") 1057 m.PopulateParts(sizeAB+int64(sizeC), []schema.BytesPart{ 1058 { 1059 Size: sizeAB / 2, 1060 BlobRef: refA, 1061 }, 1062 { 1063 Size: sizeAB / 2, 1064 BlobRef: refB, 1065 }, 1066 { 1067 Size: uint64(sizeC), 1068 BlobRef: refC, 1069 }, 1070 }) 1071 fjson, err := m.JSON() 1072 if err != nil { 1073 t.Fatalf("schema filemap JSON: %v", err) 1074 } 1075 fb := &test.Blob{Contents: fjson} 1076 fb.MustUpload(t, s) 1077 num := large.NumBlobs() 1078 if num < 1 || num > 2 { 1079 t.Fatalf("for size %d, num packed zip blobs = %d; want 1 or 2", sizeC, num) 1080 } 1081 return num == 2 1082 } 1083 maxC := maxBlobSize - sizeAB 1084 smallestC := sort.Search(maxC, generatesTwoZips) 1085 if smallestC == maxC { 1086 t.Fatalf("never found a point at which we generated 2 zip files") 1087 } 1088 t.Logf("After 12 MB of data (in 2 chunks), the smallest blob that generates two zip files is %d bytes (%.03f MB)", smallestC, float64(smallestC)/(1<<20)) 1089 t.Logf("Zip overhead (for this two chunk file) = %d bytes", maxBlobSize-1-smallestC-sizeAB) 1090 1091 mu.Lock() 1092 if sawTruncate != refB { 1093 t.Errorf("truncate after = %v; want %v", sawTruncate, refB) 1094 } 1095 if !stoppedBeforeOverflow { 1096 t.Error("never hit the code path where it calculates that another data chunk would push it over the 16MB boundary") 1097 } 1098} 1099 1100func slurpBlob(t *testing.T, sto blob.Fetcher, br blob.Ref) []byte { 1101 rc, _, err := sto.Fetch(ctxbg, br) 1102 if err != nil { 1103 t.Fatal(err) 1104 } 1105 defer rc.Close() 1106 slurp, err := ioutil.ReadAll(rc) 1107 if err != nil { 1108 t.Fatal(err) 1109 } 1110 return slurp 1111} 1112