1/*
2Copyright 2014 The Perkeep Authors
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package blobpacked
18
19import (
20	"archive/zip"
21	"bytes"
22	"context"
23	"encoding/json"
24	"errors"
25	"fmt"
26	"io"
27	"io/ioutil"
28	"math/rand"
29	"runtime"
30	"sort"
31	"strconv"
32	"strings"
33	"sync"
34	"testing"
35	"time"
36
37	"perkeep.org/internal/testhooks"
38	"perkeep.org/pkg/blob"
39	"perkeep.org/pkg/blobserver"
40	"perkeep.org/pkg/blobserver/storagetest"
41	"perkeep.org/pkg/constants"
42	"perkeep.org/pkg/schema"
43	"perkeep.org/pkg/sorted"
44	"perkeep.org/pkg/test"
45
46	"go4.org/syncutil"
47)
48
49func init() {
50	testhooks.SetUseSHA1(true)
51}
52
53const debug = false
54
55var ctxbg = context.Background()
56
57func TestStorage(t *testing.T) {
58	storagetest.Test(t, func(t *testing.T) (sto blobserver.Storage, cleanup func()) {
59		s := &storage{
60			small: new(test.Fetcher),
61			large: new(test.Fetcher),
62			meta:  sorted.NewMemoryKeyValue(),
63			log:   test.NewLogger(t, "blobpacked: "),
64		}
65		s.init()
66		return s, func() {}
67	})
68}
69
70func TestStorageNoSmallSubfetch(t *testing.T) {
71	storagetest.Test(t, func(t *testing.T) (sto blobserver.Storage, cleanup func()) {
72		s := &storage{
73			// We need to hide SubFetcher, to test *storage's SubFetch, as it delegates
74			// to the underlying SubFetcher, if small implements that interface.
75			small: hideSubFetcher(new(test.Fetcher)),
76			large: new(test.Fetcher),
77			meta:  sorted.NewMemoryKeyValue(),
78			log:   test.NewLogger(t, "blobpacked: "),
79		}
80		s.init()
81		return s, func() {}
82	})
83}
84
85func hideSubFetcher(sto blobserver.Storage) blobserver.Storage {
86	if _, ok := sto.(blob.SubFetcher); ok {
87		return struct{ blobserver.Storage }{sto}
88	}
89	return sto
90}
91
92func TestParseMetaRow(t *testing.T) {
93	cases := []struct {
94		in   string
95		want meta
96		err  bool
97	}{
98		{in: "123 sx", err: true},
99		{in: "-123 s", err: true},
100		{in: "", err: true},
101		{in: "1 ", err: true},
102		{in: " ", err: true},
103		{in: "123 x", err: true},
104		{in: "123 l", err: true},
105		{in: "123 sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15", err: true},
106		{in: "123 notaref 12", err: true},
107		{in: "123 sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15 42 extra", err: true},
108		{in: "123 sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15 42 ", err: true},
109		{in: "123 sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15 42", want: meta{
110			exists:   true,
111			size:     123,
112			largeRef: blob.MustParse("sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15"),
113			largeOff: 42,
114		}},
115	}
116	for _, tt := range cases {
117		got, err := parseMetaRow([]byte(tt.in))
118		if (err != nil) != tt.err {
119			t.Errorf("For %q error = %v; want-err? = %v", tt.in, err, tt.err)
120			continue
121		}
122		if tt.err {
123			continue
124		}
125		if got != tt.want {
126			t.Errorf("For %q, parseMetaRow = %+v; want %+v", tt.in, got, tt.want)
127		}
128	}
129}
130
131func wantNumLargeBlobs(want int) func(*packTest) {
132	return func(pt *packTest) { pt.wantLargeBlobs = want }
133}
134
135func wantNumSmallBlobs(want int) func(*packTest) {
136	return func(pt *packTest) { pt.wantSmallBlobs = want }
137}
138
139func okayWithoutMeta(refStr string) func(*packTest) {
140	return func(pt *packTest) {
141		if pt.okayNoMeta == nil {
142			pt.okayNoMeta = map[blob.Ref]bool{}
143		}
144		pt.okayNoMeta[blob.MustParse(refStr)] = true
145	}
146}
147
148func randBytesSrc(n int, src int64) []byte {
149	r := rand.New(rand.NewSource(src))
150	s := make([]byte, n)
151	for i := range s {
152		s[i] = byte(r.Int63())
153	}
154	return s
155}
156
157func randBytes(n int) []byte {
158	return randBytesSrc(n, 42)
159}
160
161func TestPackNormal(t *testing.T) {
162	const fileSize = 5 << 20
163	const fileName = "foo.dat"
164	fileContents := randBytes(fileSize)
165
166	hash := blob.NewHash()
167	hash.Write(fileContents)
168	wholeRef := blob.RefFromHash(hash)
169
170	pt := testPack(t,
171		func(sto blobserver.Storage) error {
172			_, err := schema.WriteFileFromReader(ctxbg, sto, fileName, bytes.NewReader(fileContents))
173			return err
174		},
175		wantNumLargeBlobs(1),
176		wantNumSmallBlobs(0),
177	)
178	// And verify we can read it back out.
179	pt.testOpenWholeRef(t, wholeRef, fileSize)
180}
181
182func TestPackNoDelete(t *testing.T) {
183	const fileSize = 1 << 20
184	const fileName = "foo.dat"
185	fileContents := randBytes(fileSize)
186	testPack(t,
187		func(sto blobserver.Storage) error {
188			_, err := schema.WriteFileFromReader(ctxbg, sto, fileName, bytes.NewReader(fileContents))
189			return err
190		},
191		func(pt *packTest) { pt.sto.skipDelete = true },
192		wantNumLargeBlobs(1),
193		wantNumSmallBlobs(14), // empirically
194	)
195}
196
197func TestPackLarge(t *testing.T) {
198	if testing.Short() {
199		t.Skip("skipping in short mode")
200	}
201	const fileSize = 17 << 20 // more than 16 MB, so more than one zip
202	const fileName = "foo.dat"
203	fileContents := randBytes(fileSize)
204
205	hash := blob.NewHash()
206	hash.Write(fileContents)
207	wholeRef := blob.RefFromHash(hash)
208
209	pt := testPack(t,
210		func(sto blobserver.Storage) error {
211			_, err := schema.WriteFileFromReader(ctxbg, sto, fileName, bytes.NewReader(fileContents))
212			return err
213		},
214		wantNumLargeBlobs(2),
215		wantNumSmallBlobs(0),
216	)
217
218	// Gather the "w:*" meta rows we wrote.
219	got := map[string]string{}
220	if err := sorted.Foreach(pt.sto.meta, func(key, value string) error {
221		if strings.HasPrefix(key, "b:") {
222			return nil
223		}
224		got[key] = value
225		return nil
226	}); err != nil {
227		t.Fatal(err)
228	}
229
230	// Verify the two zips are correctly described.
231
232	// There should be one row to say that we have two zip, and
233	// that the overall file is 17MB:
234	keyBase := "w:" + wholeRef.String()
235	if g, w := got[keyBase], "17825792 2"; g != w {
236		t.Fatalf("meta row for key %q = %q; want %q", keyBase, g, w)
237	}
238
239	// ... (and a little helper) ...
240	parseMeta := func(n int) (zipOff, dataOff, dataLen int64) {
241		key := keyBase + ":" + strconv.Itoa(n)
242		v := got[key]
243		f := strings.Fields(v)
244		if len(f) != 4 {
245			t.Fatalf("meta for key %q = %q; expected 4 space-separated fields", key, v)
246		}
247		i64 := func(n int) int64 {
248			i, err := strconv.ParseInt(f[n], 10, 64)
249			if err != nil {
250				t.Fatalf("error parsing int64 %q in field index %d of meta key %q (value %q): %v", f[n], n, key, v, err)
251			}
252			return i
253		}
254		zipOff, dataOff, dataLen = i64(1), i64(2), i64(3)
255		return
256	}
257
258	// And then verify if we have the two "w:<wholeref>:0" and
259	// "w:<wholeref>:1" rows and that they're consistent.
260	z0, d0, l0 := parseMeta(0)
261	z1, d1, l1 := parseMeta(1)
262	if z0 != z1 {
263		t.Errorf("expected zip offset in zip0 and zip1 to match. got %d and %d", z0, z0)
264	}
265	if d0 != 0 {
266		t.Errorf("zip0's data offset = %d; want 0", d0)
267	}
268	if d1 != l0 {
269		t.Errorf("zip1 data offset %d != zip0 data length %d", d1, l0)
270	}
271	if d1+l1 != fileSize {
272		t.Errorf("zip1's offset %d + length %d = %d; want %d (fileSize)", d1, l1, d1+l1, fileSize)
273	}
274
275	// And verify we can read it back out.
276	pt.testOpenWholeRef(t, wholeRef, fileSize)
277}
278
279func countSortedRows(t *testing.T, meta sorted.KeyValue) int {
280	rows := 0
281	if err := sorted.Foreach(meta, func(key, value string) error {
282		rows++
283		return nil
284	}); err != nil {
285		t.Fatal(err)
286	}
287	return rows
288}
289
290func TestParseZipMetaRow(t *testing.T) {
291	tests := []struct {
292		zm       zipMetaInfo
293		wholeRef blob.Ref
294		offset   uint64
295	}{
296		{
297			zm: zipMetaInfo{
298				zipSize:   16738962,
299				wholeSize: 139639864,
300				dataSize:  16659276,
301				wholeRef:  blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"),
302			},
303			offset: 0,
304		},
305		{
306			zm: zipMetaInfo{
307				zipSize:   16739170,
308				wholeSize: 139639864,
309				dataSize:  16670204,
310				wholeRef:  blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"),
311			},
312			offset: 16659276,
313		},
314		{
315			zm: zipMetaInfo{
316				zipSize:   16744577,
317				wholeSize: 139639864,
318				dataSize:  16668625,
319				wholeRef:  blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"),
320			},
321			offset: 33329480,
322		},
323		{
324			zm: zipMetaInfo{
325				zipSize:   16628223,
326				wholeSize: 139639864,
327				dataSize:  16555478,
328				wholeRef:  blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"),
329			},
330			offset: 49998105,
331		},
332		{
333			zm: zipMetaInfo{
334				zipSize:   16735901,
335				wholeSize: 139639864,
336				dataSize:  16661990,
337				wholeRef:  blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"),
338			},
339			offset: 66553583,
340		},
341		{
342			zm: zipMetaInfo{
343				zipSize:   16628162,
344				wholeSize: 139639864,
345				dataSize:  16555638,
346				wholeRef:  blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"),
347			},
348			offset: 83215573,
349		},
350		{
351			zm: zipMetaInfo{
352				zipSize:   16638400,
353				wholeSize: 139639864,
354				dataSize:  16569680,
355				wholeRef:  blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"),
356			},
357			offset: 99771211,
358		},
359		{
360			zm: zipMetaInfo{
361				zipSize:   16731570,
362				wholeSize: 139639864,
363				dataSize:  16665343,
364				wholeRef:  blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"),
365			},
366			offset: 116340891,
367		},
368		{
369			zm: zipMetaInfo{
370				zipSize:   6656201,
371				wholeSize: 139639864,
372				dataSize:  6633630,
373				wholeRef:  blob.MustParse("sha224-d003d3cf9784df4efe617ba319c5028fe93e5e9188cc448bf6d655b4"),
374			},
375			offset: 133006234,
376		},
377	}
378	for k, tt := range tests {
379		rv := tt.zm.rowValue(tt.offset)
380		got, err := parseZipMetaRow([]byte(rv))
381		if err != nil {
382			t.Fatal(err)
383		}
384		if tt.zm != got {
385			t.Errorf("for zip %d;\n got: %#v\n want: %#v\n", k, got, tt.zm)
386		}
387	}
388}
389
390func TestReindex(t *testing.T) {
391	if testing.Short() {
392		t.Skip("skipping in short mode")
393	}
394
395	type file struct {
396		size     int64
397		name     string
398		contents []byte
399	}
400	files := []file{
401		{17 << 20, "foo.dat", randBytesSrc(17<<20, 42)},
402		{10 << 20, "bar.dat", randBytesSrc(10<<20, 43)},
403		{5 << 20, "baz.dat", randBytesSrc(5<<20, 44)},
404	}
405
406	pt := testPack(t,
407		func(sto blobserver.Storage) error {
408			for _, f := range files {
409				if _, err := schema.WriteFileFromReader(ctxbg, sto, f.name, bytes.NewReader(f.contents)); err != nil {
410					return err
411				}
412			}
413			return nil
414		},
415		wantNumLargeBlobs(4),
416		wantNumSmallBlobs(0),
417	)
418
419	// backup the meta that is supposed to be lost/erased.
420	// pt.sto.reindex allocates a new pt.sto.meta, so meta != pt.sto.meta after it is called.
421	meta := pt.sto.meta
422
423	// and build new meta index
424	if err := pt.sto.reindex(context.TODO(), func() (sorted.KeyValue, error) {
425		return sorted.NewMemoryKeyValue(), nil
426	}); err != nil {
427		t.Fatal(err)
428	}
429
430	validBlobKey := func(key, value string) error {
431		if !strings.HasPrefix(key, "b:") {
432			return errors.New("not a blob meta key")
433		}
434		wantRef, ok := blob.Parse(key[2:])
435		if !ok {
436			return errors.New("bogus blobref in key")
437		}
438		m, err := parseMetaRow([]byte(value))
439		if err != nil {
440			return err
441		}
442		rc, err := pt.large.SubFetch(ctxbg, m.largeRef, int64(m.largeOff), int64(m.size))
443		if err != nil {
444			return err
445		}
446		defer rc.Close()
447		h := wantRef.Hash()
448		n, err := io.Copy(h, rc)
449		if err != nil {
450			return err
451		}
452
453		if !wantRef.HashMatches(h) {
454			return errors.New("content doesn't match")
455		}
456		if n != int64(m.size) {
457			return errors.New("size doesn't match")
458		}
459		return nil
460	}
461
462	// check that new meta is identical to "lost" one
463	newRows := 0
464	if err := sorted.Foreach(pt.sto.meta, func(key, newValue string) error {
465		oldValue, err := meta.Get(key)
466		if err != nil {
467			t.Fatalf("Could not get value for %v in old meta: %v", key, err)
468		}
469		newRows++
470		// Exact match is fine.
471		if oldValue == newValue {
472			return nil
473		}
474		// If it differs, it should at least be correct. (blob metadata
475		// can now point to different packed zips, depending on sorting)
476		err = validBlobKey(key, newValue)
477		if err == nil {
478			return nil
479		}
480		t.Errorf("Reindexing error: for key %v: %v\n got: %q\nwant: %q", key, err, newValue, oldValue)
481		return nil // keep enumerating, regardless of errors
482	}); err != nil {
483		t.Fatal(err)
484	}
485
486	// make sure they have the same number of entries too, to be sure that the reindexing
487	// did not miss entries that the old meta had.
488	oldRows := countSortedRows(t, meta)
489	if oldRows != newRows {
490		t.Fatalf("index number of entries mismatch: got %d entries in new index, wanted %d (as in index before reindexing)", newRows, oldRows)
491	}
492
493	// And verify we can read one of the files back out.
494	hash := blob.NewHash()
495	hash.Write(files[0].contents)
496	pt.testOpenWholeRef(t, blob.RefFromHash(hash), files[0].size)
497
498	// Specifically check the z: rows.
499	zrows := []string{
500		"z:sha1-41e7665e4e3f491790121fb0440b4f685b3386cb | 16762318 sha1-f6bcda1d4111f45ca785499ae9b3bae019608f65 17825792 0 16709479",
501		"z:sha1-60e61eef95c38e15e8b6422cdaa8a95ad6c38a8b | 1120477 sha1-f6bcda1d4111f45ca785499ae9b3bae019608f65 17825792 16709479 1116313",
502		"z:sha1-9655da8b87e7ccfd804edf1c5967219e2e1ae556 | 5260755 sha1-28aa3334333bb57610ff397432dad6d2c41dc520 5242880 0 5242880",
503		"z:sha1-bc317462c29d9b70891538b7491ac420334d7ef8 | 10516226 sha1-87cdaac04cb9a37c0378970e8ab58f09f22a9907 10485760 0 10485760",
504	}
505	it := pt.sto.meta.Find(zipMetaPrefix, zipMetaPrefixLimit)
506	i := 0
507	for it.Next() {
508		got := it.Key() + " | " + it.Value()
509		if zrows[i] != got {
510			t.Errorf("for row %d;\n got: %v\n want: %v\n", i, got, zrows[i])
511		}
512		i++
513	}
514	it.Close()
515
516	recoMode, err := pt.sto.checkLargeIntegrity()
517	if err != nil {
518		t.Fatal(err)
519	}
520	if recoMode != NoRecovery {
521		t.Fatalf("recovery mode after integrity check: %v", recoMode)
522	}
523}
524
525func (pt *packTest) testOpenWholeRef(t *testing.T, wholeRef blob.Ref, wantSize int64) {
526	rc, gotSize, err := pt.sto.OpenWholeRef(wholeRef, 0)
527	if err != nil {
528		t.Errorf("OpenWholeRef = %v", err)
529		return
530	}
531	defer rc.Close()
532	if gotSize != wantSize {
533		t.Errorf("OpenWholeRef size = %v; want %v", gotSize, wantSize)
534		return
535	}
536	h := blob.NewHash()
537	n, err := io.Copy(h, rc)
538	if err != nil {
539		t.Errorf("OpenWholeRef read error: %v", err)
540		return
541	}
542	if n != wantSize {
543		t.Errorf("OpenWholeRef read %v bytes; want %v", n, wantSize)
544		return
545	}
546	gotRef := blob.RefFromHash(h)
547	if gotRef != wholeRef {
548		t.Errorf("OpenWholeRef read contents = %v; want %v", gotRef, wholeRef)
549	}
550}
551
552func TestPackTwoIdenticalfiles(t *testing.T) {
553	const fileSize = 1 << 20
554	fileContents := randBytes(fileSize)
555	testPack(t,
556		func(sto blobserver.Storage) (err error) {
557			if _, err = schema.WriteFileFromReader(ctxbg, sto, "a.txt", bytes.NewReader(fileContents)); err != nil {
558				return
559			}
560			if _, err = schema.WriteFileFromReader(ctxbg, sto, "b.txt", bytes.NewReader(fileContents)); err != nil {
561				return
562			}
563			return
564		},
565		func(pt *packTest) { pt.sto.packGate = syncutil.NewGate(1) }, // one pack at a time
566		wantNumLargeBlobs(1),
567		wantNumSmallBlobs(1), // just the "b.txt" file schema blob
568		okayWithoutMeta("sha1-7912d1f93942e84cb7ebd6bd6c83b7c152dc102b"),
569	)
570}
571
572// packTest is the state kept while running func testPack.
573type packTest struct {
574	sto                   *storage
575	logical, small, large *test.Fetcher
576
577	wantLargeBlobs interface{} // nil means disabled, else int
578	wantSmallBlobs interface{} // nil means disabled, else int
579
580	okayNoMeta map[blob.Ref]bool
581}
582
583func testPack(t *testing.T,
584	write func(sto blobserver.Storage) error,
585	checks ...func(*packTest),
586) *packTest {
587	ctx, cancel := context.WithCancel(context.TODO())
588	defer cancel()
589
590	logical := new(test.Fetcher)
591	small, large := new(test.Fetcher), new(test.Fetcher)
592	pt := &packTest{
593		logical: logical,
594		small:   small,
595		large:   large,
596	}
597	// Figure out the logical baseline blobs we'll later expect in the packed storage.
598	if err := write(logical); err != nil {
599		t.Fatal(err)
600	}
601	t.Logf("items in logical storage: %d", logical.NumBlobs())
602
603	pt.sto = &storage{
604		small: small,
605		large: large,
606		meta:  sorted.NewMemoryKeyValue(),
607		log:   test.NewLogger(t, "blobpacked: "),
608	}
609	pt.sto.init()
610
611	for _, setOpt := range checks {
612		setOpt(pt)
613	}
614
615	if err := write(pt.sto); err != nil {
616		t.Fatal(err)
617	}
618
619	t.Logf("items in small: %v", small.NumBlobs())
620	t.Logf("items in large: %v", large.NumBlobs())
621
622	if want, ok := pt.wantLargeBlobs.(int); ok && want != large.NumBlobs() {
623		t.Fatalf("num large blobs = %d; want %d", large.NumBlobs(), want)
624	}
625	if want, ok := pt.wantSmallBlobs.(int); ok && want != small.NumBlobs() {
626		t.Fatalf("num small blobs = %d; want %d", small.NumBlobs(), want)
627	}
628
629	var zipRefs []blob.Ref
630	var zipSeen = map[blob.Ref]bool{}
631	blobserver.EnumerateAll(ctx, large, func(sb blob.SizedRef) error {
632		zipRefs = append(zipRefs, sb.Ref)
633		zipSeen[sb.Ref] = true
634		return nil
635	})
636	if len(zipRefs) != large.NumBlobs() {
637		t.Fatalf("Enumerated only %d zip files; expected %d", len(zipRefs), large.NumBlobs())
638	}
639
640	bytesOfZip := map[blob.Ref][]byte{}
641	for _, zipRef := range zipRefs {
642		rc, _, err := large.Fetch(ctxbg, zipRef)
643		if err != nil {
644			t.Fatal(err)
645		}
646		zipBytes, err := ioutil.ReadAll(rc)
647		rc.Close()
648		if err != nil {
649			t.Fatalf("Error slurping %s: %v", zipRef, err)
650		}
651		if len(zipBytes) > constants.MaxBlobSize {
652			t.Fatalf("zip is too large: %d > max %d", len(zipBytes), constants.MaxBlobSize)
653		}
654		bytesOfZip[zipRef] = zipBytes
655		zr, err := zip.NewReader(bytes.NewReader(zipBytes), int64(len(zipBytes)))
656		if err != nil {
657			t.Fatalf("Error reading resulting zip file: %v", err)
658		}
659		if len(zr.File) == 0 {
660			t.Fatal("zip is empty")
661		}
662		nameSeen := map[string]bool{}
663		for i, zf := range zr.File {
664			if nameSeen[zf.Name] {
665				t.Errorf("duplicate name %q seen", zf.Name)
666			}
667			nameSeen[zf.Name] = true
668			t.Logf("zip[%d] size %d, %v", i, zf.UncompressedSize64, zf.Name)
669		}
670		mfr, err := zr.File[len(zr.File)-1].Open()
671		if err != nil {
672			t.Fatalf("Error opening manifest JSON: %v", err)
673		}
674		maniJSON, err := ioutil.ReadAll(mfr)
675		if err != nil {
676			t.Fatalf("Error reading manifest JSON: %v", err)
677		}
678		var mf Manifest
679		if err := json.Unmarshal(maniJSON, &mf); err != nil {
680			t.Fatalf("invalid JSON: %v", err)
681		}
682
683		// Verify each chunk described in the manifest:
684		baseOffset, err := zr.File[0].DataOffset()
685		if err != nil {
686			t.Fatal(err)
687		}
688		for _, bo := range mf.DataBlobs {
689			h := bo.Ref.Hash()
690			h.Write(zipBytes[baseOffset+bo.Offset : baseOffset+bo.Offset+int64(bo.Size)])
691			if !bo.Ref.HashMatches(h) {
692				t.Errorf("blob %+v didn't describe the actual data in the zip", bo)
693			}
694		}
695		if debug {
696			t.Logf("Manifest: %s", maniJSON)
697		}
698	}
699
700	// Verify that each chunk in the logical mapping is in the meta.
701	logBlobs := 0
702	if err := blobserver.EnumerateAll(ctx, logical, func(sb blob.SizedRef) error {
703		logBlobs++
704		v, err := pt.sto.meta.Get(blobMetaPrefix + sb.Ref.String())
705		if err == sorted.ErrNotFound && pt.okayNoMeta[sb.Ref] {
706			return nil
707		}
708		if err != nil {
709			return fmt.Errorf("error looking up logical blob %v in meta: %v", sb.Ref, err)
710		}
711		m, err := parseMetaRow([]byte(v))
712		if err != nil {
713			return fmt.Errorf("error parsing logical blob %v meta %q: %v", sb.Ref, v, err)
714		}
715		if !m.exists || m.size != sb.Size || !zipSeen[m.largeRef] {
716			return fmt.Errorf("logical blob %v = %+v; want in zip", sb.Ref, m)
717		}
718		h := sb.Ref.Hash()
719		h.Write(bytesOfZip[m.largeRef][m.largeOff : m.largeOff+sb.Size])
720		if !sb.Ref.HashMatches(h) {
721			t.Errorf("blob %v not found matching in zip", sb.Ref)
722		}
723		return nil
724	}); err != nil {
725		t.Fatal(err)
726	}
727	if logBlobs != logical.NumBlobs() {
728		t.Error("enumerate over logical blobs didn't work?")
729	}
730
731	// TODO, more tests:
732	// -- like TestPackTwoIdenticalfiles, but instead of testing
733	// no dup for 100% identical file bytes, test that uploading a
734	// 49% identical one does not denormalize and repack.
735	// -- test StreamBlobs in all its various flavours, and recovering from stream blobs.
736	// -- overflowing the 16MB chunk size with huge initial chunks
737	return pt
738}
739
740// see if storage proxies through to small for Fetch, Stat, and Enumerate.
741func TestSmallFallback(t *testing.T) {
742	small := new(test.Fetcher)
743	s := &storage{
744		small: small,
745		large: new(test.Fetcher),
746		meta:  sorted.NewMemoryKeyValue(),
747		log:   test.NewLogger(t, "blobpacked: "),
748	}
749	s.init()
750	b1 := &test.Blob{"foo"}
751	b1.MustUpload(t, small)
752	wantSB := b1.SizedRef()
753
754	// Fetch
755	rc, _, err := s.Fetch(ctxbg, b1.BlobRef())
756	if err != nil {
757		t.Errorf("failed to Get blob: %v", err)
758	} else {
759		rc.Close()
760	}
761
762	// Stat.
763	sb, err := blobserver.StatBlob(ctxbg, s, b1.BlobRef())
764	if err != nil {
765		t.Errorf("failed to Stat blob: %v", err)
766	} else if sb != wantSB {
767		t.Errorf("Stat = %v; want %v", sb, wantSB)
768	}
769
770	// Enumerate
771	saw := false
772	ctx, cancel := context.WithCancel(context.TODO())
773	defer cancel()
774	if err := blobserver.EnumerateAll(ctx, s, func(sb blob.SizedRef) error {
775		if sb != wantSB {
776			return fmt.Errorf("saw blob %v; want %v", sb, wantSB)
777		}
778		saw = true
779		return nil
780	}); err != nil {
781		t.Errorf("EnuerateAll: %v", err)
782	}
783	if !saw {
784		t.Error("didn't see blob in Enumerate")
785	}
786}
787
788func TestZ_LeakCheck(t *testing.T) {
789	if testing.Short() {
790		return
791	}
792	time.Sleep(50 * time.Millisecond) // let goroutines schedule & die off
793	buf := make([]byte, 1<<20)
794	buf = buf[:runtime.Stack(buf, true)]
795	n := bytes.Count(buf, []byte("[chan receive]:"))
796	if n > 1 {
797		t.Errorf("%d goroutines in chan receive: %s", n, buf)
798	}
799}
800
801func TestForeachZipBlob(t *testing.T) {
802	const fileSize = 2 << 20
803	const fileName = "foo.dat"
804	fileContents := randBytes(fileSize)
805
806	ctx, cancel := context.WithCancel(context.TODO())
807	defer cancel()
808
809	pt := testPack(t,
810		func(sto blobserver.Storage) error {
811			_, err := schema.WriteFileFromReader(ctxbg, sto, fileName, bytes.NewReader(fileContents))
812			return err
813		},
814		wantNumLargeBlobs(1),
815		wantNumSmallBlobs(0),
816	)
817
818	zipBlob, err := singleBlob(pt.large)
819	if err != nil {
820		t.Fatal(err)
821	}
822	zipBytes := slurpBlob(t, pt.large, zipBlob.Ref)
823	zipSize := len(zipBytes)
824
825	all := map[blob.Ref]blob.SizedRef{}
826	if err := blobserver.EnumerateAll(ctx, pt.logical, func(sb blob.SizedRef) error {
827		all[sb.Ref] = sb
828		return nil
829	}); err != nil {
830		t.Fatal(err)
831	}
832	foreachSaw := 0
833	blobSizeSum := 0
834	if err := pt.sto.foreachZipBlob(ctxbg, zipBlob.Ref, func(bap BlobAndPos) error {
835		foreachSaw++
836		blobSizeSum += int(bap.Size)
837		want, ok := all[bap.Ref]
838		if !ok {
839			t.Errorf("unwanted blob ref returned from foreachZipBlob: %v", bap.Ref)
840			return nil
841		}
842		delete(all, bap.Ref)
843		if want.Size != bap.Size {
844			t.Errorf("for %v, foreachZipBlob size = %d; want %d", bap.Ref, bap.Size, want.Size)
845			return nil
846		}
847
848		// Verify the offset.
849		h := bap.Ref.Hash()
850		h.Write(zipBytes[bap.Offset : bap.Offset+int64(bap.Size)])
851		if !bap.Ref.HashMatches(h) {
852			return fmt.Errorf("foreachZipBlob returned blob %v at offset %d that failed validation", bap.Ref, bap.Offset)
853		}
854
855		return nil
856	}); err != nil {
857		t.Fatal(err)
858	}
859
860	t.Logf("foreachZipBlob enumerated %d blobs", foreachSaw)
861	if len(all) > 0 {
862		t.Errorf("foreachZipBlob forgot to enumerate %d blobs: %v", len(all), all)
863	}
864	// Calculate per-blobref zip overhead (zip file headers/TOC/manifest file, etc)
865	zipOverhead := zipSize - blobSizeSum
866	t.Logf("zip fixed overhead = %d bytes, for %d blobs (%d bytes each)", zipOverhead, foreachSaw, zipOverhead/foreachSaw)
867}
868
869// singleBlob assumes that sto contains a single blob and returns it.
870// If there are more or fewer than one blob, it's an error.
871func singleBlob(sto blobserver.BlobEnumerator) (ret blob.SizedRef, err error) {
872	ctx, cancel := context.WithCancel(context.TODO())
873	defer cancel()
874
875	n := 0
876	if err = blobserver.EnumerateAll(ctx, sto, func(sb blob.SizedRef) error {
877		ret = sb
878		n++
879		return nil
880	}); err != nil {
881		return blob.SizedRef{}, err
882	}
883	if n != 1 {
884		return blob.SizedRef{}, fmt.Errorf("saw %d blobs; want 1", n)
885	}
886	return
887}
888
889func TestRemoveBlobs(t *testing.T) {
890	ctx, cancel := context.WithCancel(context.TODO())
891	defer cancel()
892
893	// The basic small cases are handled via storagetest in TestStorage,
894	// so this only tests removing packed blobs.
895
896	small := new(test.Fetcher)
897	large := new(test.Fetcher)
898	sto := &storage{
899		small: small,
900		large: large,
901		meta:  sorted.NewMemoryKeyValue(),
902		log:   test.NewLogger(t, "blobpacked: "),
903	}
904	sto.init()
905
906	const fileSize = 1 << 20
907	fileContents := randBytes(fileSize)
908	if _, err := schema.WriteFileFromReader(ctxbg, sto, "foo.dat", bytes.NewReader(fileContents)); err != nil {
909		t.Fatal(err)
910	}
911	if small.NumBlobs() != 0 || large.NumBlobs() == 0 {
912		t.Fatalf("small, large counts == %d, %d; want 0, non-zero", small.NumBlobs(), large.NumBlobs())
913	}
914	var all []blob.SizedRef
915	if err := blobserver.EnumerateAll(ctx, sto, func(sb blob.SizedRef) error {
916		all = append(all, sb)
917		return nil
918	}); err != nil {
919		t.Fatal(err)
920	}
921
922	// Find the zip
923	zipBlob, err := singleBlob(sto.large)
924	if err != nil {
925		t.Fatalf("failed to find packed zip: %v", err)
926	}
927
928	// The zip file is in use, so verify we can't delete it.
929	if err := sto.deleteZipPack(ctxbg, zipBlob.Ref); err == nil {
930		t.Fatalf("zip pack blob deleted but it should not have been allowed")
931	}
932
933	// Delete everything
934	for len(all) > 0 {
935		del := all[0].Ref
936		all = all[1:]
937		if err := sto.RemoveBlobs(ctx, []blob.Ref{del}); err != nil {
938			t.Fatalf("RemoveBlobs: %v", err)
939		}
940		if err := storagetest.CheckEnumerate(sto, all); err != nil {
941			t.Fatalf("After deleting %v, %v", del, err)
942		}
943	}
944
945	dRows := func() (n int) {
946		if err := sorted.ForeachInRange(sto.meta, "d:", "", func(key, value string) error {
947			if strings.HasPrefix(key, "d:") {
948				n++
949			}
950			return nil
951		}); err != nil {
952			t.Fatalf("meta iteration error: %v", err)
953		}
954		return
955	}
956
957	if n := dRows(); n == 0 {
958		t.Fatalf("expected a 'd:' row after deletes")
959	}
960
961	// TODO: test the background pack-deleter loop? figure out its design first.
962	if err := sto.deleteZipPack(ctxbg, zipBlob.Ref); err != nil {
963		t.Errorf("error deleting zip %v: %v", zipBlob.Ref, err)
964	}
965	if n := dRows(); n != 0 {
966		t.Errorf("expected the 'd:' row to be deleted")
967	}
968}
969
970func setIntTemporarily(i *int, tempVal int) (restore func()) {
971	old := *i
972	*i = tempVal
973	return func() { *i = old }
974}
975
976func TestPackerBoundarySplits(t *testing.T) {
977	if testing.Short() {
978		t.Skip("skipping slow test")
979	}
980	// Test a file of three chunk sizes, totalling near the 16 MB
981	// boundary:
982	//    - 1st chunk is 6 MB. ("blobA")
983	//    - 2nd chunk is 6 MB. ("blobB")
984	//    - 3rd chunk ("blobC") is binary-searched (up to 4MB) to find
985	//      which size causes the packer to write two zip files.
986
987	// During the test we set zip overhead boundaries to 0, to
988	// force the test to into its pathological misprediction code paths,
989	// where it needs to back up and rewrite the zip with one part less.
990	// That's why the test starts with two zip files: so there's at
991	// least one that can be removed to make room.
992	defer setIntTemporarily(&zipPerEntryOverhead, 0)()
993
994	const sizeAB = 12 << 20
995	const maxBlobSize = 16 << 20
996	bytesAB := randBytes(sizeAB)
997	blobA := &test.Blob{string(bytesAB[:sizeAB/2])}
998	blobB := &test.Blob{string(bytesAB[sizeAB/2:])}
999	refA := blobA.BlobRef()
1000	refB := blobB.BlobRef()
1001	bytesCFull := randBytes(maxBlobSize - sizeAB) // will be sliced down
1002
1003	// Mechanism to verify we hit the back-up code path:
1004	var (
1005		mu                    sync.Mutex
1006		sawTruncate           blob.Ref
1007		stoppedBeforeOverflow bool
1008	)
1009	testHookSawTruncate = func(after blob.Ref) {
1010		if after != refB {
1011			t.Errorf("unexpected truncate point %v", after)
1012		}
1013		mu.Lock()
1014		defer mu.Unlock()
1015		sawTruncate = after
1016	}
1017	testHookStopBeforeOverflowing = func() {
1018		mu.Lock()
1019		defer mu.Unlock()
1020		stoppedBeforeOverflow = true
1021	}
1022	defer func() {
1023		testHookSawTruncate = nil
1024		testHookStopBeforeOverflowing = nil
1025	}()
1026
1027	generatesTwoZips := func(sizeC int) (ret bool) {
1028		large := new(test.Fetcher)
1029		s := &storage{
1030			small: new(test.Fetcher),
1031			large: large,
1032			meta:  sorted.NewMemoryKeyValue(),
1033			log: test.NewLogger(t, "blobpacked: ",
1034				// Ignore these phrases:
1035				"Packing file ",
1036				"Packed file ",
1037			),
1038		}
1039		s.init()
1040
1041		// Upload first two chunks
1042		blobA.MustUpload(t, s)
1043		blobB.MustUpload(t, s)
1044
1045		// Upload second chunk
1046		bytesC := bytesCFull[:sizeC]
1047		h := blob.NewHash()
1048		h.Write(bytesC)
1049		refC := blob.RefFromHash(h)
1050		_, err := s.ReceiveBlob(ctxbg, refC, bytes.NewReader(bytesC))
1051		if err != nil {
1052			t.Fatal(err)
1053		}
1054
1055		// Upload the file schema blob.
1056		m := schema.NewFileMap("foo.dat")
1057		m.PopulateParts(sizeAB+int64(sizeC), []schema.BytesPart{
1058			{
1059				Size:    sizeAB / 2,
1060				BlobRef: refA,
1061			},
1062			{
1063				Size:    sizeAB / 2,
1064				BlobRef: refB,
1065			},
1066			{
1067				Size:    uint64(sizeC),
1068				BlobRef: refC,
1069			},
1070		})
1071		fjson, err := m.JSON()
1072		if err != nil {
1073			t.Fatalf("schema filemap JSON: %v", err)
1074		}
1075		fb := &test.Blob{Contents: fjson}
1076		fb.MustUpload(t, s)
1077		num := large.NumBlobs()
1078		if num < 1 || num > 2 {
1079			t.Fatalf("for size %d, num packed zip blobs = %d; want 1 or 2", sizeC, num)
1080		}
1081		return num == 2
1082	}
1083	maxC := maxBlobSize - sizeAB
1084	smallestC := sort.Search(maxC, generatesTwoZips)
1085	if smallestC == maxC {
1086		t.Fatalf("never found a point at which we generated 2 zip files")
1087	}
1088	t.Logf("After 12 MB of data (in 2 chunks), the smallest blob that generates two zip files is %d bytes (%.03f MB)", smallestC, float64(smallestC)/(1<<20))
1089	t.Logf("Zip overhead (for this two chunk file) = %d bytes", maxBlobSize-1-smallestC-sizeAB)
1090
1091	mu.Lock()
1092	if sawTruncate != refB {
1093		t.Errorf("truncate after = %v; want %v", sawTruncate, refB)
1094	}
1095	if !stoppedBeforeOverflow {
1096		t.Error("never hit the code path where it calculates that another data chunk would push it over the 16MB boundary")
1097	}
1098}
1099
1100func slurpBlob(t *testing.T, sto blob.Fetcher, br blob.Ref) []byte {
1101	rc, _, err := sto.Fetch(ctxbg, br)
1102	if err != nil {
1103		t.Fatal(err)
1104	}
1105	defer rc.Close()
1106	slurp, err := ioutil.ReadAll(rc)
1107	if err != nil {
1108		t.Fatal(err)
1109	}
1110	return slurp
1111}
1112