1/*
2Copyright 2014 The Perkeep AUTHORS
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17/*
18Package blobpacked registers the "blobpacked" blobserver storage type,
19storing blobs initially as one physical blob per logical blob, but then
20rearranging little physical blobs into large contiguous blobs organized by
21how they'll likely be accessed. An index tracks the mapping from logical to
22physical blobs.
23
24Example low-level config:
25
26     "/storage/": {
27         "handler": "storage-blobpacked",
28         "handlerArgs": {
29            "smallBlobs": "/small/",
30            "largeBlobs": "/large/",
31            "metaIndex": {
32               "type": "mysql",
33                .....
34            }
35          }
36     }
37
38The resulting large blobs are valid zip files. Those blobs may up be up to
3916 MB and contain the original contiguous file (or fractions of it), as well
40as metadata about how the file is cut up. The zip file will have the
41following structure:
42
43    foo.jpg       (or whatever)
44    camlistore/sha1-beb1df0b75952c7d277905ad14de71ef7ef90c44.json (some file ref)
45    camlistore/sha1-a0ceb10b04403c9cc1d032e07a9071db5e711c9a.json (some bytes ref)
46    camlistore/sha1-7b4d9c8529c27d592255c6dfb17188493db96ccc.json (another bytes ref)
47    camlistore/camlistore-pack-manifest.json
48
49The camlistore-pack-manifest.json is documented on the exported
50Manifest type. It looks like this:
51
52    {
53      "wholeRef": "sha1-0e64816d731a56915e8bb4ae4d0ac7485c0b84da",
54      "wholeSize": 2962227200, // 2.8GB; so will require ~176-180 16MB chunks
55      "wholePartIndex": 17,    // 0-based
56      "dataBlobsOrigin": "sha1-355705cf62a56669303d2561f29e0620a676c36e",
57      "dataBlobs": [
58          {"blob": "sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15", "offset": 0, "size": 273048},
59          {"blob": "sha1-e242ed3bffccdf271b7fbaf34ed72d089537b42f", "offset": 273048, "size": 112783},
60          {"blob": "sha1-6eadeac2dade6347e87c0d24fd455feffa7069f0", "offset": 385831, ...},
61          {"blob": "sha1-9425cca1dde5d8b6eb70cd087db4e356da92396e", "offset": ...},
62          {"blob": "sha1-7709559a3c8668c57cc0a2f57c418b1cc3598049", "offset": ...},
63          {"blob": "sha1-f62cb5d05cfbf2a7a6c7f8339d0a4bf1dcd0ab6c", "offset": ...}
64      ] // raw data blobs of foo.jpg
65    }
66
67The manifest.json ensures that if the metadata index is lost, all the
68data can be reconstructed from the raw zip files.
69
70The 'wholeRef' property specifies which large file that this zip is building
71up.  If the file is less than 15.5 MB or so (leaving room for the zip
72overhead and manifest size), it will probably all be in one zip and the
73first file in the zip will be the whole thing. Otherwise it'll be cut across
74multiple zip files, each no larger than 16MB. In that case, each part of the
75file will have a different 'wholePartIndex' number, starting at index
760. Each will have the same 'wholeSize'.
77*/
78package blobpacked // import "perkeep.org/pkg/blobserver/blobpacked"
79
80// TODO: BlobStreamer using the zip manifests, for recovery.
81
82import (
83	"archive/zip"
84	"bytes"
85	"context"
86	"crypto/sha1"
87	"encoding/json"
88	"errors"
89	"fmt"
90	"io"
91	"log"
92	"os"
93	"runtime"
94	"sort"
95	"strconv"
96	"strings"
97	"sync"
98	"time"
99
100	"perkeep.org/internal/pools"
101	"perkeep.org/pkg/blob"
102	"perkeep.org/pkg/blobserver"
103	"perkeep.org/pkg/constants"
104	"perkeep.org/pkg/env"
105	"perkeep.org/pkg/schema"
106	"perkeep.org/pkg/sorted"
107
108	"go4.org/jsonconfig"
109	"go4.org/strutil"
110	"go4.org/syncutil"
111)
112
113// TODO: evaluate whether this should even be 0, to keep the schema blobs together at least.
114// Files under this size aren't packed.
115const packThreshold = 512 << 10
116
117// Overhead for zip files.
118// These are only variables so they can be changed by tests, but
119// they're effectively constant.
120var (
121	zipFixedOverhead = 20 /*directory64EndLen*/ +
122		56 /*directory64LocLen */ +
123		22 /*directoryEndLen*/ +
124		512 /* conservative slop space, to get us away from 16 MB zip boundary */
125	zipPerEntryOverhead = 30 /*fileHeaderLen*/ +
126		24 /*dataDescriptor64Len*/ +
127		22 /*directoryEndLen*/ +
128		len("camlistore/sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15.dat")*3/2 /*padding for larger blobrefs*/
129)
130
131// meta key prefixes
132const (
133	blobMetaPrefix      = "b:"
134	blobMetaPrefixLimit = "b;"
135
136	wholeMetaPrefix      = "w:"
137	wholeMetaPrefixLimit = "w;"
138
139	zipMetaPrefix      = "z:"
140	zipMetaPrefixLimit = "z;"
141)
142
143const (
144	zipManifestPath = "camlistore/camlistore-pack-manifest.json"
145)
146
147// RecoveryMode is the mode in which the blobpacked server starts.
148type RecoveryMode int
149
150// Note: not using iota for these, because they're stored in GCE
151// instance's metadata values.
152const (
153	// NoRecovery means blobpacked does not attempt to repair its index on startup.
154	// It is the default.
155	NoRecovery RecoveryMode = 0
156	// FastRecovery populates the blobpacked index, without erasing any existing one.
157	FastRecovery RecoveryMode = 1
158	// FullRecovery erases the existing blobpacked index, then rebuilds it.
159	FullRecovery RecoveryMode = 2
160)
161
162var (
163	recoveryMu sync.Mutex
164	recovery   = NoRecovery
165)
166
167// TODO(mpl): make SetRecovery a method of type storage if we ever export it.
168
169// SetRecovery sets the recovery mode for the blobpacked package.
170// If set to one of the modes other than NoRecovery, it means that any
171// blobpacked storage subsequently initialized will automatically start with
172// rebuilding its meta index of zip files, in accordance with the selected mode.
173func SetRecovery(mode RecoveryMode) {
174	recoveryMu.Lock()
175	defer recoveryMu.Unlock()
176	recovery = mode
177}
178
179type subFetcherStorage interface {
180	blobserver.Storage
181	blob.SubFetcher
182}
183
184// TODO(mpl): all a logf method or something to storage so we get the
185// "blobpacked:" prefix automatically to log messages.
186
187type storage struct {
188	small blobserver.Storage
189	large subFetcherStorage
190
191	// meta key -> value rows are:
192	//
193	// For logical blobs packed within a large blob, "b:" prefix:
194	//   b:sha1-xxxx -> "<size> <big-blobref> <offset_u32>"
195	//
196	// For wholerefs: (wholeMetaPrefix)
197	//   w:sha1-xxxx(wholeref) -> "<nbytes_total_u64> <nchunks_u32>"
198	// Then for each big nchunk of the file:
199	// The wholeRef and the chunk number as a key to: the blobRef of the zip
200	// file, the position of the data within the zip, the position of the data
201	// within the uploaded whole file, the length of data in this zip.
202	//   w:sha1-xxxx:0 -> "<zipchunk-blobref> <offset-in-zipchunk-blobref> <offset-in-whole_u64> <length_u32>"
203	//   w:sha1-xxxx:...
204	//   w:sha1-xxxx:(nchunks-1)
205	//
206	// For zipRefs: (zipMetaPrefix)
207	// key: blobref of the zip, prefixed by "z:"
208	// value: size of the zip, blobref of the contents of the whole file (which may
209	// span multiple zips, ~15.5 MB of data per zip), size of the whole file, position
210	// in the whole file of the data (first file) in the zip, size of the data in the
211	// zip (== size of the zip's first file).
212	//   z:<zip-blobref> -> "<zip_size_u32> <whole_ref_from_zip_manifest> <whole_size_u64>
213	//   <zip_data_offset_in_whole_u64> <zip_data_bytes_u32>"
214	//
215	// For marking that zips that have blobs (possibly all)
216	// deleted from inside them: (deleted zip)
217	//   d:sha1-xxxxxx -> <unix-time-of-delete>
218	meta sorted.KeyValue
219
220	// If non-zero, the maximum size of a zip blob.
221	// It defaults to constants.MaxBlobSize.
222	forceMaxZipBlobSize int
223
224	skipDelete bool // don't delete from small after packing
225
226	packGate *syncutil.Gate
227
228	loggerOnce sync.Once
229	log        *log.Logger // nil means default
230}
231
232var (
233	_ blobserver.BlobStreamer    = (*storage)(nil)
234	_ blobserver.Generationer    = (*storage)(nil)
235	_ blobserver.WholeRefFetcher = (*storage)(nil)
236)
237
238func (s *storage) String() string {
239	return fmt.Sprintf("\"blobpacked\" storage")
240}
241
242func (s *storage) Logf(format string, args ...interface{}) {
243	s.logger().Printf(format, args...)
244}
245
246func (s *storage) logger() *log.Logger {
247	s.loggerOnce.Do(s.initLogger)
248	return s.log
249}
250
251func (s *storage) initLogger() {
252	if s.log == nil {
253		s.log = log.New(os.Stderr, "blobpacked: ", log.LstdFlags)
254	}
255}
256
257func (s *storage) init() {
258	s.packGate = syncutil.NewGate(10)
259}
260
261func (s *storage) maxZipBlobSize() int {
262	if s.forceMaxZipBlobSize > 0 {
263		return s.forceMaxZipBlobSize
264	}
265	return constants.MaxBlobSize
266}
267
268func init() {
269	blobserver.RegisterStorageConstructor("blobpacked", blobserver.StorageConstructor(newFromConfig))
270}
271
272func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) {
273	var (
274		smallPrefix = conf.RequiredString("smallBlobs")
275		largePrefix = conf.RequiredString("largeBlobs")
276		metaConf    = conf.RequiredObject("metaIndex")
277		keepGoing   = conf.OptionalBool("keepGoing", false)
278	)
279	if err := conf.Validate(); err != nil {
280		return nil, err
281	}
282	small, err := ld.GetStorage(smallPrefix)
283	if err != nil {
284		return nil, fmt.Errorf("failed to load smallBlobs at %s: %v", smallPrefix, err)
285	}
286	large, err := ld.GetStorage(largePrefix)
287	if err != nil {
288		return nil, fmt.Errorf("failed to load largeBlobs at %s: %v", largePrefix, err)
289	}
290	largeSubber, ok := large.(subFetcherStorage)
291	if !ok {
292		return nil, fmt.Errorf("largeBlobs at %q of type %T doesn't support fetching sub-ranges of blobs",
293			largePrefix, large)
294	}
295	meta, err := sorted.NewKeyValueMaybeWipe(metaConf)
296	if err != nil {
297		return nil, fmt.Errorf("failed to setup blobpacked metaIndex: %v", err)
298	}
299	sto := &storage{
300		small: small,
301		large: largeSubber,
302		meta:  meta,
303	}
304	sto.init()
305
306	recoveryMu.Lock()
307	defer recoveryMu.Unlock()
308	condFatalf := func(pattern string, args ...interface{}) {
309		log.Printf(pattern, args...)
310		if !keepGoing {
311			os.Exit(1)
312		}
313	}
314
315	var newKv func() (sorted.KeyValue, error)
316	switch recovery {
317	case FastRecovery:
318		newKv = func() (sorted.KeyValue, error) {
319			return sorted.NewKeyValue(metaConf)
320		}
321	case FullRecovery:
322		newKv = func() (sorted.KeyValue, error) {
323			kv, err := sorted.NewKeyValue(metaConf)
324			if err != nil {
325				return nil, err
326			}
327			wiper, ok := kv.(sorted.Wiper)
328			if !ok {
329				return nil, fmt.Errorf("blobpacked meta index of type %T needs to be wiped, but does not support automatic wiping. It should be removed manually.", kv)
330			}
331			if err := wiper.Wipe(); err != nil {
332				return nil, fmt.Errorf("blobpacked meta index of type %T could not be wiped: %v", kv, err)
333			}
334			return kv, nil
335		}
336	}
337	if newKv != nil {
338		// i.e. we're in one of the recovery modes
339		log.Print("Starting recovery of blobpacked index")
340		if err := meta.Close(); err != nil {
341			return nil, err
342		}
343		if err := sto.reindex(context.TODO(), newKv); err != nil {
344			return nil, err
345		}
346		if _, err := sto.checkLargeIntegrity(); err != nil {
347			condFatalf("blobpacked: reindexed successfully, but error after validation: %v", err)
348		}
349		return sto, nil
350	}
351
352	// Check for a weird state: zip files exist, but no metadata about them
353	// is recorded. This is probably a corrupt state, and the user likely
354	// wants to recover.
355	if !sto.anyMeta() && sto.anyZipPacks() {
356		if env.OnGCE() {
357			// TODO(mpl): make web UI page/mode that informs about this error.
358			condFatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", FastRecovery)
359		}
360		condFatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please re-start in recovery mode with -recovery=%d", FastRecovery)
361	}
362
363	if mode, err := sto.checkLargeIntegrity(); err != nil {
364		if mode <= NoRecovery {
365			condFatalf("%v", err)
366		}
367		if env.OnGCE() {
368			// TODO(mpl): make web UI page/mode that informs about this error.
369			condFatalf("Error: %v. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", err, mode)
370		}
371		condFatalf("Error: %v. Please re-start in recovery mode with -recovery=%d", err, mode)
372	}
373
374	return sto, nil
375}
376
377// checkLargeIntegrity verifies that all large blobs in the large storage are
378// indexed in meta, and vice-versa, that all rows in meta referring to a large blob
379// correspond to an existing large blob in the large storage. If any of the above
380// is not true, it returns the recovery mode that should be used to fix the
381// problem, as well as the error detailing the problem. It does not perform any
382// check about the contents of the large blobs themselves.
383func (s *storage) checkLargeIntegrity() (RecoveryMode, error) {
384	inLarge := 0
385	var missing []blob.Ref // blobs in large but not in meta
386	var extra []blob.Ref   // blobs in meta but not in large
387	t := s.meta.Find(zipMetaPrefix, zipMetaPrefixLimit)
388	defer t.Close()
389	iterate := true
390	var enumFunc func(sb blob.SizedRef) error
391	enumFunc = func(sb blob.SizedRef) error {
392		if iterate && !t.Next() {
393			// all of the yet to be enumerated are missing from meta
394			missing = append(missing, sb.Ref)
395			return nil
396		}
397		iterate = true
398		wantMetaKey := zipMetaPrefix + sb.Ref.String()
399		metaKey := t.Key()
400		if metaKey != wantMetaKey {
401			if metaKey > wantMetaKey {
402				// zipRef missing from meta
403				missing = append(missing, sb.Ref)
404				iterate = false
405				return nil
406			}
407			// zipRef in meta that actually does not exist in s.large.
408			xbr, ok := blob.Parse(strings.TrimPrefix(metaKey, zipMetaPrefix))
409			if !ok {
410				return fmt.Errorf("boggus key in z: row: %q", metaKey)
411			}
412			extra = append(extra, xbr)
413			// iterate meta once more at the same storage enumeration point
414			return enumFunc(sb)
415		}
416		if _, err := parseZipMetaRow(t.ValueBytes()); err != nil {
417			return fmt.Errorf("error parsing row from meta: %v", err)
418		}
419		inLarge++
420		return nil
421	}
422	log.Printf("blobpacked: checking integrity of packed blobs against index...")
423	if err := blobserver.EnumerateAllFrom(context.Background(), s.large, "", enumFunc); err != nil {
424		return FullRecovery, err
425	}
426	log.Printf("blobpacked: %d large blobs found in index, %d missing from index", inLarge, len(missing))
427	if len(missing) > 0 {
428		printSample(missing, "missing")
429	}
430	if len(extra) > 0 {
431		printSample(extra, "extra")
432		return FullRecovery, fmt.Errorf("%d large blobs in index but not actually in storage", len(extra))
433	}
434	if err := t.Close(); err != nil {
435		return FullRecovery, fmt.Errorf("error reading or closing index: %v", err)
436	}
437	if len(missing) > 0 {
438		return FastRecovery, fmt.Errorf("%d large blobs missing from index", len(missing))
439	}
440	return NoRecovery, nil
441}
442
443func printSample(fromSlice []blob.Ref, sliceName string) {
444	sort.Slice(fromSlice, func(i, j int) bool { return fromSlice[i].Less(fromSlice[j]) })
445	for i, br := range fromSlice {
446		if i == 10 {
447			break
448		}
449		log.Printf("  sample %v large blob: %v", sliceName, br)
450	}
451}
452
453// zipMetaInfo is the info needed to write the wholeMetaPrefix and
454// zipMetaPrefix entries when reindexing. For a given file, spread over several
455// zips, each zip has a corresponding zipMetaInfo. The wholeMetaPrefix and
456// zipMetaPrefix rows pertaining to a file can only be written once all the
457// zipMetaInfo have been collected and sorted, because the offset of each zip's
458// data is derived from the size of the other pieces that precede it in the file.
459type zipMetaInfo struct {
460	wholePartIndex int      // index of that zip, 0-based
461	zipRef         blob.Ref // ref of the zip file holding packed data blobs + other schema blobs
462	zipSize        uint32   // size of the zipped file
463	offsetInZip    uint32   // position of the contiguous data blobs, relative to the zip
464	dataSize       uint32   // size of the data in the zip
465	wholeSize      uint64   // size of the whole file that this zip is a part of
466	wholeRef       blob.Ref // ref of the contents of the whole file
467}
468
469// rowValue returns the value of the "z:<zipref>" meta key row
470// based on the contents of zm and the provided arguments.
471func (zm zipMetaInfo) rowValue(offset uint64) string {
472	return fmt.Sprintf("%d %v %d %d %d", zm.zipSize, zm.wholeRef, zm.wholeSize, offset, zm.dataSize)
473}
474
475// TODO(mpl): add client command to call reindex on an "offline" blobpacked. camtool packblobs -reindex maybe?
476
477// fileName returns the name of the (possibly partial) first file in zipRef
478// (i.e. the actual data). It returns a zipOpenError if there was any problem
479// reading the zip, and os.ErrNotExist if the zip could not be fetched or if
480// there was no file in the zip.
481func (s *storage) fileName(ctx context.Context, zipRef blob.Ref) (string, error) {
482	_, size, err := s.large.Fetch(ctx, zipRef)
483	if err != nil {
484		return "", err
485	}
486	zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(size))
487	if err != nil {
488		return "", zipOpenError{zipRef, err}
489	}
490	for _, f := range zr.File {
491		return f.Name, nil
492	}
493	return "", os.ErrNotExist
494}
495
496// reindex rebuilds the meta index for packed blobs. It calls newMeta to create
497// a new KeyValue on which to write the index, and replaces s.meta with it. There
498// is no locking whatsoever so it should not be called when the storage is already
499// in use. its signature might change if/when it gets exported.
500func (s *storage) reindex(ctx context.Context, newMeta func() (sorted.KeyValue, error)) error {
501	meta, err := newMeta()
502	if err != nil {
503		return fmt.Errorf("failed to create new blobpacked meta index: %v", err)
504	}
505
506	zipMetaByWholeRef := make(map[blob.Ref][]zipMetaInfo)
507
508	// first a fast full enumerate, so we can report progress afterwards
509	packedTotal := 0
510	blobserver.EnumerateAllFrom(ctx, s.large, "", func(sb blob.SizedRef) error {
511		packedTotal++
512		return nil
513	})
514
515	var packedDone, packedSeen int
516	t := time.NewTicker(10 * time.Second)
517	defer t.Stop()
518	if err := blobserver.EnumerateAllFrom(ctx, s.large, "", func(sb blob.SizedRef) error {
519		select {
520		case <-t.C:
521			log.Printf("blobpacked: %d / %d zip packs seen", packedSeen, packedTotal)
522		default:
523		}
524		zipRef := sb.Ref
525		zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(sb.Size))
526		if err != nil {
527			return zipOpenError{zipRef, err}
528		}
529		var maniFile *zip.File
530		var firstOff int64 // offset of first file (the packed data chunks)
531		for i, f := range zr.File {
532			if i == 0 {
533				firstOff, err = f.DataOffset()
534				if err != nil {
535					return err
536				}
537			}
538			if f.Name == zipManifestPath {
539				maniFile = f
540				break
541			}
542		}
543		if maniFile == nil {
544			return fmt.Errorf("no perkeep manifest file found in zip %v", zipRef)
545		}
546		maniRC, err := maniFile.Open()
547		if err != nil {
548			return err
549		}
550		defer maniRC.Close()
551		var mf Manifest
552		if err := json.NewDecoder(maniRC).Decode(&mf); err != nil {
553			return err
554		}
555		if !mf.WholeRef.Valid() || mf.WholeSize == 0 || !mf.DataBlobsOrigin.Valid() {
556			return fmt.Errorf("incomplete blobpack manifest JSON in %v", zipRef)
557		}
558
559		bm := meta.BeginBatch()
560		// In this loop, we write all the blobMetaPrefix entries for the
561		// data blobs in this zip, and we also compute the dataBytesWritten, for later.
562		var dataBytesWritten int64
563		for _, bp := range mf.DataBlobs {
564			bm.Set(blobMetaPrefix+bp.SizedRef.Ref.String(), fmt.Sprintf("%d %v %d", bp.SizedRef.Size, zipRef, firstOff+bp.Offset))
565			dataBytesWritten += int64(bp.SizedRef.Size)
566		}
567		if dataBytesWritten > (1<<32 - 1) {
568			return fmt.Errorf("total data blobs size in zip %v overflows uint32", zipRef)
569		}
570		dataSize := uint32(dataBytesWritten)
571
572		// In this loop, we write all the blobMetaPrefix entries for the schema blobs in this zip
573		for _, f := range zr.File {
574			if !(strings.HasPrefix(f.Name, "camlistore/") && strings.HasSuffix(f.Name, ".json")) ||
575				f.Name == zipManifestPath {
576				continue
577			}
578			br, ok := blob.Parse(strings.TrimSuffix(strings.TrimPrefix(f.Name, "camlistore/"), ".json"))
579			if !ok {
580				return fmt.Errorf("schema file in zip %v does not have blobRef as name: %v", zipRef, f.Name)
581			}
582			offset, err := f.DataOffset()
583			if err != nil {
584				return err
585			}
586			bm.Set(blobMetaPrefix+br.String(), fmt.Sprintf("%d %v %d", f.UncompressedSize64, zipRef, offset))
587		}
588
589		if err := meta.CommitBatch(bm); err != nil {
590			return err
591		}
592
593		// record that info for later, when we got them all, so we can write the wholeMetaPrefix entries.
594		zipMetaByWholeRef[mf.WholeRef] = append(zipMetaByWholeRef[mf.WholeRef], zipMetaInfo{
595			wholePartIndex: mf.WholePartIndex,
596			zipRef:         zipRef,
597			zipSize:        sb.Size,
598			offsetInZip:    uint32(firstOff),
599			dataSize:       dataSize,
600			wholeSize:      uint64(mf.WholeSize),
601			wholeRef:       mf.WholeRef, // redundant with zipMetaByWholeRef key for now.
602		})
603		packedSeen++
604		return nil
605	}); err != nil {
606		return err
607	}
608
609	// finally, write the wholeMetaPrefix entries
610	foundDups := false
611	packedFiles := 0
612	tt := time.NewTicker(2 * time.Second)
613	defer tt.Stop()
614	bm := meta.BeginBatch()
615	for wholeRef, zipMetas := range zipMetaByWholeRef {
616		select {
617		case <-t.C:
618			log.Printf("blobpacked: %d files reindexed", packedFiles)
619		default:
620		}
621		sort.Slice(zipMetas, func(i, j int) bool { return zipMetas[i].wholePartIndex < zipMetas[j].wholePartIndex })
622		hasDup := hasDups(zipMetas)
623		if hasDup {
624			foundDups = true
625		}
626		offsets := wholeOffsets(zipMetas)
627		for _, z := range zipMetas {
628			offset := offsets[z.wholePartIndex]
629			// write the w:row
630			bm.Set(fmt.Sprintf("%s%s:%d", wholeMetaPrefix, wholeRef, z.wholePartIndex),
631				fmt.Sprintf("%s %d %d %d", z.zipRef, z.offsetInZip, offset, z.dataSize))
632			// write the z: row
633			bm.Set(fmt.Sprintf("%s%v", zipMetaPrefix, z.zipRef), z.rowValue(offset))
634			packedDone++
635		}
636		if hasDup {
637			if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); debug {
638				printDuplicates(zipMetas)
639			}
640		}
641
642		wholeBytesWritten := offsets[len(offsets)-1]
643		if zipMetas[0].wholeSize != wholeBytesWritten {
644			// Any corrupted zip should have been found earlier, so this error means we're
645			// missing at least one full zip for the whole file to be complete.
646			fileName, err := s.fileName(ctx, zipMetas[0].zipRef)
647			if err != nil {
648				return fmt.Errorf("could not get filename of file in zip %v: %v", zipMetas[0].zipRef, err)
649			}
650			log.Printf(
651				"blobpacked: file %q (wholeRef %v) is incomplete: sum of all zips (%d bytes) does not match manifest's WholeSize (%d bytes)",
652				fileName, wholeRef, wholeBytesWritten, zipMetas[0].wholeSize)
653			var allParts []blob.Ref
654			for _, z := range zipMetas {
655				allParts = append(allParts, z.zipRef)
656			}
657			log.Printf("blobpacked: known parts of %v: %v", wholeRef, allParts)
658			// we skip writing the w: row for the full file, and we don't count the file
659			// as complete.
660			continue
661		}
662		bm.Set(fmt.Sprintf("%s%s", wholeMetaPrefix, wholeRef),
663			fmt.Sprintf("%d %d", wholeBytesWritten, zipMetas[len(zipMetas)-1].wholePartIndex+1))
664		packedFiles++
665	}
666	if err := meta.CommitBatch(bm); err != nil {
667		return err
668	}
669
670	log.Printf("blobpacked: %d / %d zip packs successfully reindexed", packedDone, packedTotal)
671	if packedFiles < len(zipMetaByWholeRef) {
672		log.Printf("blobpacked: %d files reindexed, and %d incomplete file(s) found.", packedFiles, len(zipMetaByWholeRef)-packedFiles)
673	} else {
674		log.Printf("blobpacked: %d files reindexed.", packedFiles)
675	}
676	if foundDups {
677		if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); !debug {
678			log.Print("blobpacked: zip blobs with duplicate contents were found. Re-run with CAMLI_DEBUG=true for more detail.")
679		}
680	}
681
682	// TODO(mpl): take into account removed blobs. I can't be done for now
683	// (2015-01-29) because RemoveBlobs currently only updates the meta index.
684	// So if the index was lost, all information about removals was lost too.
685
686	s.meta = meta
687	return nil
688}
689
690// hasDups reports whether zm contains successive zipRefs which have the same
691// wholePartIndex, which we assume means they have the same data contents. It
692// panics if that assumption seems wrong, i.e. if the data within assumed
693// duplicates is not the same size in all of them. zm must be sorted by
694// wholePartIndex.
695// See https://github.com/perkeep/perkeep/issues/1079
696func hasDups(zm []zipMetaInfo) bool {
697	i := 0
698	var dataSize uint32
699	var firstDup blob.Ref
700	dupFound := false
701	for _, z := range zm {
702		if z.wholePartIndex == i {
703			firstDup = z.zipRef
704			dataSize = z.dataSize
705			i++
706			continue
707		}
708		// we could return true right now, but we want to go through it all, to make
709		// sure our assumption that "same part index -> duplicate" is true, using at least
710		// the dataSize to confirm. For a better effort, we should use DataBlobsOrigin.
711		if z.dataSize != dataSize {
712			panic(fmt.Sprintf("%v and %v looked like duplicates at first, but don't actually have the same dataSize. TODO: add DataBlobsOrigin checking.", firstDup, z.zipRef))
713		}
714		dupFound = true
715	}
716	return dupFound
717}
718
719// wholeOffsets returns the offset for each part of a file f, in order, assuming
720// zm are all the (wholePartIndex) ordered zip parts that constitute that file. If
721// zm seems to contain duplicates, they are skipped. The additional last item of
722// the returned slice is the sum of all the parts, i.e. the whole size of f.
723func wholeOffsets(zm []zipMetaInfo) []uint64 {
724	i := 0
725	var offsets []uint64
726	var currentOffset uint64
727	for _, z := range zm {
728		if i != z.wholePartIndex {
729			continue
730		}
731		offsets = append(offsets, currentOffset)
732		currentOffset += uint64(z.dataSize)
733		i++
734	}
735	// add the last computed offset to the slice, as it's useful info too: it's the
736	// size of all the data in the zip.
737	offsets = append(offsets, currentOffset)
738	return offsets
739}
740
741func printDuplicates(zm []zipMetaInfo) {
742	i := 0
743	byPartIndex := make(map[int][]zipMetaInfo)
744	for _, z := range zm {
745		if i == z.wholePartIndex {
746			byPartIndex[z.wholePartIndex] = []zipMetaInfo{z}
747			i++
748			continue
749		}
750		byPartIndex[z.wholePartIndex] = append(byPartIndex[z.wholePartIndex], z)
751	}
752	for _, zm := range byPartIndex {
753		if len(zm) <= 1 {
754			continue
755		}
756		br := make([]blob.Ref, 0, len(zm))
757		for _, z := range zm {
758			br = append(br, z.zipRef)
759		}
760		log.Printf("zip blobs with same data contents: %v", br)
761	}
762}
763
764func (s *storage) anyMeta() (v bool) {
765	// TODO: we only care about getting 1 row, but the
766	// sorted.KeyValue interface doesn't let us give it that
767	// hint. Care?
768	sorted.Foreach(s.meta, func(_, _ string) error {
769		v = true
770		return errors.New("stop")
771	})
772	return
773}
774
775func (s *storage) anyZipPacks() (v bool) {
776	ctx, cancel := context.WithCancel(context.TODO())
777	defer cancel()
778	dest := make(chan blob.SizedRef, 1)
779	if err := s.large.EnumerateBlobs(ctx, dest, "", 1); err != nil {
780		// Not a great interface in general, but only needed
781		// by the start-up check for now, where it doesn't
782		// really matter.
783		return false
784	}
785	_, ok := <-dest
786	return ok
787}
788
789func (s *storage) Close() error {
790	return s.meta.Close()
791}
792
793func (s *storage) StorageGeneration() (initTime time.Time, random string, err error) {
794	sgen, sok := s.small.(blobserver.Generationer)
795	lgen, lok := s.large.(blobserver.Generationer)
796	if !sok || !lok {
797		return time.Time{}, "", blobserver.GenerationNotSupportedError("underlying storage engines don't support Generationer")
798	}
799	st, srand, err := sgen.StorageGeneration()
800	if err != nil {
801		return
802	}
803	lt, lrand, err := lgen.StorageGeneration()
804	if err != nil {
805		return
806	}
807	hash := sha1.New()
808	io.WriteString(hash, srand)
809	io.WriteString(hash, lrand)
810	maxTime := func(a, b time.Time) time.Time {
811		if a.After(b) {
812			return a
813		}
814		return b
815	}
816	return maxTime(lt, st), fmt.Sprintf("%x", hash.Sum(nil)), nil
817}
818
819func (s *storage) ResetStorageGeneration() error {
820	var retErr error
821	for _, st := range []blobserver.Storage{s.small, s.large} {
822		if g, ok := st.(blobserver.Generationer); ok {
823			if err := g.ResetStorageGeneration(); err != nil {
824				retErr = err
825			}
826		}
827	}
828	return retErr
829}
830
831type meta struct {
832	exists   bool
833	size     uint32
834	largeRef blob.Ref // if invalid, then on small if exists
835	largeOff uint32
836}
837
838func (m *meta) isPacked() bool { return m.largeRef.Valid() }
839
840// if not found, err == nil.
841func (s *storage) getMetaRow(br blob.Ref) (meta, error) {
842	v, err := s.meta.Get(blobMetaPrefix + br.String())
843	if err == sorted.ErrNotFound {
844		return meta{}, nil
845	}
846	if err != nil {
847		return meta{}, fmt.Errorf("blobpacked.getMetaRow(%v) = %v", br, err)
848	}
849	return parseMetaRow([]byte(v))
850}
851
852var singleSpace = []byte{' '}
853
854// parses:
855// "<size_u32> <big-blobref> <big-offset>"
856func parseMetaRow(v []byte) (m meta, err error) {
857	row := v
858	sp := bytes.IndexByte(v, ' ')
859	if sp < 1 || sp == len(v)-1 {
860		return meta{}, fmt.Errorf("invalid metarow %q", v)
861	}
862	m.exists = true
863	size, err := strutil.ParseUintBytes(v[:sp], 10, 32)
864	if err != nil {
865		return meta{}, fmt.Errorf("invalid metarow size %q", v)
866	}
867	m.size = uint32(size)
868	v = v[sp+1:]
869
870	// remains: "<big-blobref> <big-offset>"
871	if bytes.Count(v, singleSpace) != 1 {
872		return meta{}, fmt.Errorf("invalid metarow %q: wrong number of spaces", row)
873	}
874	sp = bytes.IndexByte(v, ' ')
875	largeRef, ok := blob.ParseBytes(v[:sp])
876	if !ok {
877		return meta{}, fmt.Errorf("invalid metarow %q: bad blobref %q", row, v[:sp])
878	}
879	m.largeRef = largeRef
880	off, err := strutil.ParseUintBytes(v[sp+1:], 10, 32)
881	if err != nil {
882		return meta{}, fmt.Errorf("invalid metarow %q: bad offset: %v", row, err)
883	}
884	m.largeOff = uint32(off)
885	return m, nil
886}
887
888func parseMetaRowSizeOnly(v []byte) (size uint32, err error) {
889	sp := bytes.IndexByte(v, ' ')
890	if sp < 1 || sp == len(v)-1 {
891		return 0, fmt.Errorf("invalid metarow %q", v)
892	}
893	size64, err := strutil.ParseUintBytes(v[:sp], 10, 32)
894	if err != nil {
895		return 0, fmt.Errorf("invalid metarow size %q", v)
896	}
897	return uint32(size64), nil
898}
899
900// parses:
901// "<zip_size_u32> <whole_ref_from_zip_manifest> <whole_size_u64> <zip_data_offset_in_whole_u64> <zip_data_bytes_u32>"
902func parseZipMetaRow(v []byte) (m zipMetaInfo, err error) {
903	row := v
904	sp := bytes.IndexByte(v, ' ')
905	if sp < 1 || sp == len(v)-1 {
906		return zipMetaInfo{}, fmt.Errorf("invalid z: meta row %q", row)
907	}
908	if bytes.Count(v, singleSpace) != 4 {
909		return zipMetaInfo{}, fmt.Errorf("wrong number of spaces in z: meta row %q", row)
910	}
911	zipSize, err := strutil.ParseUintBytes(v[:sp], 10, 32)
912	if err != nil {
913		return zipMetaInfo{}, fmt.Errorf("invalid zipSize %q in z: meta row: %q", v[:sp], row)
914	}
915	m.zipSize = uint32(zipSize)
916
917	v = v[sp+1:]
918	sp = bytes.IndexByte(v, ' ')
919	wholeRef, ok := blob.ParseBytes(v[:sp])
920	if !ok {
921		return zipMetaInfo{}, fmt.Errorf("invalid wholeRef %q in z: meta row: %q", v[:sp], row)
922	}
923	m.wholeRef = wholeRef
924
925	v = v[sp+1:]
926	sp = bytes.IndexByte(v, ' ')
927	wholeSize, err := strutil.ParseUintBytes(v[:sp], 10, 64)
928	if err != nil {
929		return zipMetaInfo{}, fmt.Errorf("invalid wholeSize %q in z: meta row: %q", v[:sp], row)
930	}
931	m.wholeSize = uint64(wholeSize)
932
933	v = v[sp+1:]
934	sp = bytes.IndexByte(v, ' ')
935	if _, err := strutil.ParseUintBytes(v[:sp], 10, 64); err != nil {
936		return zipMetaInfo{}, fmt.Errorf("invalid offset %q in z: meta row: %q", v[:sp], row)
937	}
938
939	v = v[sp+1:]
940	dataSize, err := strutil.ParseUintBytes(v, 10, 32)
941	if err != nil {
942		return zipMetaInfo{}, fmt.Errorf("invalid dataSize %q in z: meta row: %q", v, row)
943	}
944	m.dataSize = uint32(dataSize)
945
946	return m, nil
947}
948
949func (s *storage) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (sb blob.SizedRef, err error) {
950	buf := pools.BytesBuffer()
951	defer pools.PutBuffer(buf)
952
953	if _, err := io.Copy(buf, source); err != nil {
954		return sb, err
955	}
956	size := uint32(buf.Len())
957	isFile := false
958	fileBlob, err := schema.BlobFromReader(br, bytes.NewReader(buf.Bytes()))
959	if err == nil && fileBlob.Type() == "file" {
960		isFile = true
961	}
962	meta, err := s.getMetaRow(br)
963	if err != nil {
964		return sb, err
965	}
966	if meta.exists {
967		sb = blob.SizedRef{Size: size, Ref: br}
968	} else {
969		sb, err = s.small.ReceiveBlob(ctx, br, buf)
970		if err != nil {
971			return sb, err
972		}
973	}
974	if !isFile || meta.isPacked() || fileBlob.PartsSize() < packThreshold {
975		return sb, nil
976	}
977
978	// Pack the blob.
979	s.packGate.Start()
980	defer s.packGate.Done()
981	// We ignore the return value from packFile since we can't
982	// really recover. At least be happy that we have all the
983	// data on 'small' already. packFile will log at least.
984	s.packFile(ctx, br)
985	return sb, nil
986}
987
988func (s *storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
989	m, err := s.getMetaRow(br)
990	if err != nil {
991		return nil, 0, err
992	}
993	if !m.exists || !m.isPacked() {
994		return s.small.Fetch(ctx, br)
995	}
996	rc, err := s.large.SubFetch(ctx, m.largeRef, int64(m.largeOff), int64(m.size))
997	if err != nil {
998		return nil, 0, err
999	}
1000	return rc, m.size, nil
1001}
1002
1003const removeLookups = 50 // arbitrary
1004
1005func (s *storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
1006	// Plan:
1007	//  -- delete from small (if it's there)
1008	//  -- if in big, update the meta index to note that it's there, but deleted.
1009	//  -- fetch big's zip file (constructed from a ReaderAt that is all dummy zeros +
1010	//     the zip's TOC only, relying on big being a SubFetcher, and keeping info in
1011	//     the meta about the offset of the TOC+total size of each big's zip)
1012	//  -- iterate over the zip's blobs (at some point). If all are marked deleted, actually RemoveBlob
1013	//     on big to delete the full zip and then delete all the meta rows.
1014	var (
1015		mu       sync.Mutex
1016		unpacked []blob.Ref
1017		packed   []blob.Ref
1018		large    = map[blob.Ref]bool{} // the large blobs that packed are in
1019	)
1020	var grp syncutil.Group
1021	delGate := syncutil.NewGate(removeLookups)
1022	for _, br := range blobs {
1023		br := br
1024		delGate.Start()
1025		grp.Go(func() error {
1026			defer delGate.Done()
1027			m, err := s.getMetaRow(br)
1028			if err != nil {
1029				return err
1030			}
1031			mu.Lock()
1032			defer mu.Unlock()
1033			if m.isPacked() {
1034				packed = append(packed, br)
1035				large[m.largeRef] = true
1036			} else {
1037				unpacked = append(unpacked, br)
1038			}
1039			return nil
1040		})
1041	}
1042	if err := grp.Err(); err != nil {
1043		return err
1044	}
1045	if len(unpacked) > 0 {
1046		grp.Go(func() error {
1047			return s.small.RemoveBlobs(ctx, unpacked)
1048		})
1049	}
1050	if len(packed) > 0 {
1051		grp.Go(func() error {
1052			bm := s.meta.BeginBatch()
1053			now := time.Now()
1054			for zipRef := range large {
1055				bm.Set("d:"+zipRef.String(), fmt.Sprint(now.Unix()))
1056			}
1057			for _, br := range packed {
1058				bm.Delete("b:" + br.String())
1059			}
1060			return s.meta.CommitBatch(bm)
1061		})
1062	}
1063	return grp.Err()
1064}
1065
1066var statGate = syncutil.NewGate(50) // arbitrary
1067
1068func (s *storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
1069	var (
1070		trySmallMu sync.Mutex
1071		trySmall   []blob.Ref
1072	)
1073
1074	err := blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(br blob.Ref) (sb blob.SizedRef, err error) {
1075		m, err := s.getMetaRow(br)
1076		if err != nil {
1077			return sb, err
1078		}
1079		if m.exists {
1080			return blob.SizedRef{Ref: br, Size: m.size}, nil
1081		}
1082		// Try it in round two against the small blobs:
1083		trySmallMu.Lock()
1084		trySmall = append(trySmall, br)
1085		trySmallMu.Unlock()
1086		return sb, nil
1087	})
1088	if err != nil {
1089		return err
1090	}
1091	if len(trySmall) == 0 {
1092		return nil
1093	}
1094	return s.small.StatBlobs(ctx, trySmall, fn)
1095}
1096
1097func (s *storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) {
1098	return blobserver.MergedEnumerate(ctx, dest, []blobserver.BlobEnumerator{
1099		s.small,
1100		enumerator{s},
1101	}, after, limit)
1102}
1103
1104// enumerator implements EnumerateBlobs.
1105type enumerator struct {
1106	*storage
1107}
1108
1109func (s enumerator) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) {
1110	defer close(dest)
1111	t := s.meta.Find(blobMetaPrefix+after, blobMetaPrefixLimit)
1112	defer func() {
1113		closeErr := t.Close()
1114		if err == nil {
1115			err = closeErr
1116		}
1117	}()
1118	n := 0
1119	afterb := []byte(after)
1120	for n < limit && t.Next() {
1121		key := t.KeyBytes()[len(blobMetaPrefix):]
1122		if n == 0 && bytes.Equal(key, afterb) {
1123			continue
1124		}
1125		n++
1126		br, ok := blob.ParseBytes(key)
1127		if !ok {
1128			return fmt.Errorf("unknown key %q in meta index", t.Key())
1129		}
1130		size, err := parseMetaRowSizeOnly(t.ValueBytes())
1131		if err != nil {
1132			return err
1133		}
1134		select {
1135		case <-ctx.Done():
1136			return ctx.Err()
1137		case dest <- blob.SizedRef{Ref: br, Size: size}:
1138		}
1139	}
1140	return nil
1141}
1142
1143func (s *storage) packFile(ctx context.Context, fileRef blob.Ref) (err error) {
1144	s.Logf("Packing file %s ...", fileRef)
1145	defer func() {
1146		if err == nil {
1147			s.Logf("Packed file %s", fileRef)
1148		} else {
1149			s.Logf("Error packing file %s: %v", fileRef, err)
1150		}
1151	}()
1152
1153	fr, err := schema.NewFileReader(ctx, s, fileRef)
1154	if err != nil {
1155		return err
1156	}
1157	return newPacker(s, fileRef, fr).pack(ctx)
1158}
1159
1160func newPacker(s *storage, fileRef blob.Ref, fr *schema.FileReader) *packer {
1161	return &packer{
1162		s:            s,
1163		fileRef:      fileRef,
1164		fr:           fr,
1165		dataSize:     map[blob.Ref]uint32{},
1166		schemaBlob:   map[blob.Ref]*blob.Blob{},
1167		schemaParent: map[blob.Ref][]blob.Ref{},
1168	}
1169}
1170
1171// A packer writes a file out
1172type packer struct {
1173	s       *storage
1174	fileRef blob.Ref
1175	fr      *schema.FileReader
1176
1177	wholeRef  blob.Ref
1178	wholeSize int64
1179
1180	dataRefs []blob.Ref // in order
1181	dataSize map[blob.Ref]uint32
1182
1183	schemaRefs   []blob.Ref // in order, but irrelevant
1184	schemaBlob   map[blob.Ref]*blob.Blob
1185	schemaParent map[blob.Ref][]blob.Ref // data blob -> its parent/ancestor schema blob(s), all the way up to fileRef included
1186
1187	chunksRemain      []blob.Ref
1188	zips              []writtenZip
1189	wholeBytesWritten int64 // sum of zips.dataRefs.size
1190}
1191
1192type writtenZip struct {
1193	blob.SizedRef
1194	dataRefs []blob.Ref
1195}
1196
1197var (
1198	testHookSawTruncate           func(blob.Ref)
1199	testHookStopBeforeOverflowing func()
1200)
1201
1202func (pk *packer) pack(ctx context.Context) error {
1203	if err := pk.scanChunks(ctx); err != nil {
1204		return err
1205	}
1206
1207	// TODO: decide as a fuction of schemaRefs and dataRefs
1208	// already in s.large whether it makes sense to still compact
1209	// this from a savings standpoint. For now we just always do.
1210	// Maybe we'd have knobs in the future. Ideally not.
1211
1212	// Don't pack a file if we already have its wholeref stored
1213	// otherwise (perhaps under a different filename). But that
1214	// means we have to compute its wholeref first. We assume the
1215	// blob source will cache these lookups so it's not too
1216	// expensive to do two passes over the input.
1217	h := blob.NewHash()
1218	var err error
1219	pk.wholeSize, err = io.Copy(h, pk.fr)
1220	if err != nil {
1221		return err
1222	}
1223	pk.wholeRef = blob.RefFromHash(h)
1224	wholeKey := wholeMetaPrefix + pk.wholeRef.String()
1225	_, err = pk.s.meta.Get(wholeKey)
1226	if err == nil {
1227		// Nil error means there was some knowledge of this wholeref.
1228		return fmt.Errorf("already have wholeref %v packed; not packing again", pk.wholeRef)
1229	} else if err != sorted.ErrNotFound {
1230		return err
1231	}
1232
1233	pk.chunksRemain = pk.dataRefs
1234	var trunc blob.Ref
1235MakingZips:
1236	for len(pk.chunksRemain) > 0 {
1237		if err := pk.writeAZip(ctx, trunc); err != nil {
1238			if needTrunc, ok := err.(needsTruncatedAfterError); ok {
1239				trunc = needTrunc.Ref
1240				if fn := testHookSawTruncate; fn != nil {
1241					fn(trunc)
1242				}
1243				continue MakingZips
1244			}
1245			return err
1246		}
1247		trunc = blob.Ref{}
1248	}
1249
1250	// Record the final wholeMetaPrefix record:
1251	err = pk.s.meta.Set(wholeKey, fmt.Sprintf("%d %d", pk.wholeSize, len(pk.zips)))
1252	if err != nil {
1253		return fmt.Errorf("Error setting %s: %v", wholeKey, err)
1254	}
1255
1256	return nil
1257}
1258
1259func (pk *packer) scanChunks(ctx context.Context) error {
1260	schemaSeen := map[blob.Ref]bool{}
1261	return pk.fr.ForeachChunk(ctx, func(schemaPath []blob.Ref, p schema.BytesPart) error {
1262		if !p.BlobRef.Valid() {
1263			return errors.New("sparse files are not packed")
1264		}
1265		if p.Offset != 0 {
1266			// TODO: maybe care about this later, if we ever start making
1267			// these sorts of files.
1268			return errors.New("file uses complicated schema. not packing")
1269		}
1270		pk.schemaParent[p.BlobRef] = append([]blob.Ref(nil), schemaPath...) // clone it
1271		pk.dataSize[p.BlobRef] = uint32(p.Size)
1272		for _, schemaRef := range schemaPath {
1273			if schemaSeen[schemaRef] {
1274				continue
1275			}
1276			schemaSeen[schemaRef] = true
1277			pk.schemaRefs = append(pk.schemaRefs, schemaRef)
1278			if b, err := blob.FromFetcher(ctx, pk.s, schemaRef); err != nil {
1279				return err
1280			} else {
1281				pk.schemaBlob[schemaRef] = b
1282			}
1283		}
1284		pk.dataRefs = append(pk.dataRefs, p.BlobRef)
1285		return nil
1286	})
1287}
1288
1289// needsTruncatedAfterError is returned by writeAZip if it failed in its estimation and the zip file
1290// was over the 16MB (or whatever) max blob size limit. In this case the caller tries again
1291type needsTruncatedAfterError struct{ blob.Ref }
1292
1293func (e needsTruncatedAfterError) Error() string { return "needs truncation after " + e.Ref.String() }
1294
1295// check should only be used for things which really shouldn't ever happen, but should
1296// still be checked. If there is interesting logic in the 'else', then don't use this.
1297func check(err error) {
1298	if err != nil {
1299		b := make([]byte, 2<<10)
1300		b = b[:runtime.Stack(b, false)]
1301		log.Printf("Unlikely error condition triggered: %v at %s", err, b)
1302		panic(err)
1303	}
1304}
1305
1306// trunc is a hint about which blob to truncate after. It may be zero.
1307// If the returned error is of type 'needsTruncatedAfterError', then
1308// the zip should be attempted to be written again, but truncating the
1309// data after the listed blob.
1310func (pk *packer) writeAZip(ctx context.Context, trunc blob.Ref) (err error) {
1311	defer func() {
1312		if e := recover(); e != nil {
1313			if v, ok := e.(error); ok && err == nil {
1314				err = v
1315			} else {
1316				panic(e)
1317			}
1318		}
1319	}()
1320	mf := Manifest{
1321		WholeRef:       pk.wholeRef,
1322		WholeSize:      pk.wholeSize,
1323		WholePartIndex: len(pk.zips),
1324	}
1325	var zbuf bytes.Buffer
1326	cw := &countWriter{w: &zbuf}
1327	zw := zip.NewWriter(cw)
1328
1329	var approxSize = zipFixedOverhead // can't use zbuf.Len because zw buffers
1330	var dataRefsWritten []blob.Ref
1331	var dataBytesWritten int64
1332	var schemaBlobSeen = map[blob.Ref]bool{}
1333	var schemaBlobs []blob.Ref // to add after the main file
1334
1335	baseFileName := pk.fr.FileName()
1336	if strings.Contains(baseFileName, "/") || strings.Contains(baseFileName, "\\") {
1337		return fmt.Errorf("File schema blob %v filename had a slash in it: %q", pk.fr.SchemaBlobRef(), baseFileName)
1338	}
1339	fh := &zip.FileHeader{
1340		Name:   baseFileName,
1341		Method: zip.Store, // uncompressed
1342	}
1343	fh.SetModTime(pk.fr.ModTime())
1344	fh.SetMode(0644)
1345	fw, err := zw.CreateHeader(fh)
1346	check(err)
1347	check(zw.Flush())
1348	dataStart := cw.n
1349	approxSize += zipPerEntryOverhead // for the first FileHeader w/ the data
1350
1351	zipMax := pk.s.maxZipBlobSize()
1352	chunks := pk.chunksRemain
1353	chunkWholeHash := blob.NewHash()
1354	for len(chunks) > 0 {
1355		dr := chunks[0] // the next chunk to maybe write
1356
1357		if trunc.Valid() && trunc == dr {
1358			if approxSize == 0 {
1359				return errors.New("first blob is too large to pack, once you add the zip overhead")
1360			}
1361			break
1362		}
1363
1364		schemaBlobsSave := schemaBlobs
1365		for _, parent := range pk.schemaParent[dr] {
1366			if !schemaBlobSeen[parent] {
1367				schemaBlobSeen[parent] = true
1368				schemaBlobs = append(schemaBlobs, parent)
1369				approxSize += int(pk.schemaBlob[parent].Size()) + zipPerEntryOverhead
1370			}
1371		}
1372
1373		thisSize := pk.dataSize[dr]
1374		approxSize += int(thisSize)
1375		if approxSize+mf.approxSerializedSize() > zipMax {
1376			if fn := testHookStopBeforeOverflowing; fn != nil {
1377				fn()
1378			}
1379			schemaBlobs = schemaBlobsSave // restore it
1380			break
1381		}
1382
1383		// Copy the data to the zip.
1384		rc, size, err := pk.s.Fetch(ctx, dr)
1385		check(err)
1386		if size != thisSize {
1387			rc.Close()
1388			return errors.New("unexpected size")
1389		}
1390		if n, err := io.Copy(io.MultiWriter(fw, chunkWholeHash), rc); err != nil || n != int64(size) {
1391			rc.Close()
1392			return fmt.Errorf("copy to zip = %v, %v; want %v bytes", n, err, size)
1393		}
1394		rc.Close()
1395
1396		dataRefsWritten = append(dataRefsWritten, dr)
1397		dataBytesWritten += int64(size)
1398		chunks = chunks[1:]
1399	}
1400	mf.DataBlobsOrigin = blob.RefFromHash(chunkWholeHash)
1401
1402	// zipBlobs is where a schema or data blob is relative to the beginning
1403	// of the zip file.
1404	var zipBlobs []BlobAndPos
1405
1406	var dataOffset int64
1407	for _, br := range dataRefsWritten {
1408		size := pk.dataSize[br]
1409		mf.DataBlobs = append(mf.DataBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: size}, dataOffset})
1410
1411		zipBlobs = append(zipBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: size}, dataStart + dataOffset})
1412		dataOffset += int64(size)
1413	}
1414
1415	for _, br := range schemaBlobs {
1416		fw, err := zw.CreateHeader(&zip.FileHeader{
1417			Name:   "camlistore/" + br.String() + ".json",
1418			Method: zip.Store, // uncompressed
1419		})
1420		check(err)
1421		check(zw.Flush())
1422		b := pk.schemaBlob[br]
1423		zipBlobs = append(zipBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: b.Size()}, cw.n})
1424		r, err := b.ReadAll(ctx)
1425		if err != nil {
1426			return err
1427		}
1428		n, err := io.Copy(fw, r)
1429
1430		check(err)
1431		if n != int64(b.Size()) {
1432			return fmt.Errorf("failed to write all of schema blob %v: %d bytes, not wanted %d", br, n, b.Size())
1433		}
1434	}
1435
1436	// Manifest file
1437	fw, err = zw.Create(zipManifestPath)
1438	check(err)
1439	enc, err := json.MarshalIndent(mf, "", "  ")
1440	check(err)
1441	_, err = fw.Write(enc)
1442	check(err)
1443	err = zw.Close()
1444	check(err)
1445
1446	if zbuf.Len() > zipMax {
1447		// We guessed wrong. Back up. Find out how many blobs we went over.
1448		overage := zbuf.Len() - zipMax
1449		for i := len(dataRefsWritten) - 1; i >= 0; i-- {
1450			dr := dataRefsWritten[i]
1451			if overage <= 0 {
1452				return needsTruncatedAfterError{dr}
1453			}
1454			overage -= int(pk.dataSize[dr])
1455		}
1456		return errors.New("file is unpackable; first blob is too big to fit")
1457	}
1458
1459	zipRef := blob.RefFromBytes(zbuf.Bytes())
1460	zipSB, err := blobserver.ReceiveNoHash(ctx, pk.s.large, zipRef, bytes.NewReader(zbuf.Bytes()))
1461	if err != nil {
1462		return err
1463	}
1464
1465	bm := pk.s.meta.BeginBatch()
1466	bm.Set(fmt.Sprintf("%s%s:%d", wholeMetaPrefix, pk.wholeRef, len(pk.zips)),
1467		fmt.Sprintf("%s %d %d %d",
1468			zipRef,
1469			dataStart,
1470			pk.wholeBytesWritten,
1471			dataBytesWritten))
1472	bm.Set(fmt.Sprintf("%s%v", zipMetaPrefix, zipRef),
1473		fmt.Sprintf("%d %v %d %d %d",
1474			zipSB.Size,
1475			pk.wholeRef,
1476			pk.wholeSize,
1477			pk.wholeBytesWritten,
1478			dataBytesWritten))
1479
1480	pk.wholeBytesWritten += dataBytesWritten
1481	pk.zips = append(pk.zips, writtenZip{
1482		SizedRef: zipSB,
1483		dataRefs: dataRefsWritten,
1484	})
1485
1486	for _, zb := range zipBlobs {
1487		bm.Set(blobMetaPrefix+zb.Ref.String(), fmt.Sprintf("%d %v %d", zb.Size, zipRef, zb.Offset))
1488	}
1489	if err := pk.s.meta.CommitBatch(bm); err != nil {
1490		return err
1491	}
1492
1493	// Delete from small
1494	if !pk.s.skipDelete {
1495		toDelete := make([]blob.Ref, 0, len(dataRefsWritten)+len(schemaBlobs))
1496		toDelete = append(toDelete, dataRefsWritten...)
1497		toDelete = append(toDelete, schemaBlobs...)
1498		if err := pk.s.small.RemoveBlobs(ctx, toDelete); err != nil {
1499			// Can't really do anything about it and doesn't really matter, so
1500			// just log for now.
1501			pk.s.Logf("Error removing blobs from %s: %v", pk.s.small, err)
1502		}
1503	}
1504
1505	// On success, consume the chunks we wrote from pk.chunksRemain.
1506	pk.chunksRemain = pk.chunksRemain[len(dataRefsWritten):]
1507	return nil
1508}
1509
1510type zipOpenError struct {
1511	zipRef blob.Ref
1512	err    error
1513}
1514
1515func (ze zipOpenError) Error() string {
1516	return fmt.Sprintf("Error opening packed zip blob %v: %v", ze.zipRef, ze.err)
1517}
1518
1519// foreachZipBlob calls fn for each blob in the zip pack blob
1520// identified by zipRef.  If fn returns a non-nil error,
1521// foreachZipBlob stops enumerating with that error.
1522func (s *storage) foreachZipBlob(ctx context.Context, zipRef blob.Ref, fn func(BlobAndPos) error) error {
1523	sb, err := blobserver.StatBlob(ctx, s.large, zipRef)
1524	if err != nil {
1525		return err
1526	}
1527	zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(sb.Size))
1528	if err != nil {
1529		return zipOpenError{zipRef, err}
1530	}
1531	var maniFile *zip.File // or nil if not found
1532	var firstOff int64     // offset of first file (the packed data chunks)
1533	for i, f := range zr.File {
1534		if i == 0 {
1535			firstOff, err = f.DataOffset()
1536			if err != nil {
1537				return err
1538			}
1539		}
1540		if f.Name == zipManifestPath {
1541			maniFile = f
1542			break
1543		}
1544	}
1545	if maniFile == nil {
1546		return errors.New("no camlistore manifest file found in zip")
1547	}
1548	// apply fn to all the schema blobs
1549	for _, f := range zr.File {
1550		if !strings.HasPrefix(f.Name, "camlistore/") || f.Name == zipManifestPath ||
1551			!strings.HasSuffix(f.Name, ".json") {
1552			continue
1553		}
1554		brStr := strings.TrimSuffix(strings.TrimPrefix(f.Name, "camlistore/"), ".json")
1555		br, ok := blob.Parse(brStr)
1556		if ok {
1557			off, err := f.DataOffset()
1558			if err != nil {
1559				return err
1560			}
1561			if err := fn(BlobAndPos{
1562				SizedRef: blob.SizedRef{Ref: br, Size: uint32(f.UncompressedSize64)},
1563				Offset:   off,
1564			}); err != nil {
1565				return err
1566			}
1567		}
1568	}
1569	maniRC, err := maniFile.Open()
1570	if err != nil {
1571		return err
1572	}
1573	defer maniRC.Close()
1574
1575	var mf Manifest
1576	if err := json.NewDecoder(maniRC).Decode(&mf); err != nil {
1577		return err
1578	}
1579	if !mf.WholeRef.Valid() || mf.WholeSize == 0 || !mf.DataBlobsOrigin.Valid() {
1580		return errors.New("incomplete blobpack manifest JSON")
1581	}
1582	// apply fn to all the data blobs
1583	for _, bap := range mf.DataBlobs {
1584		bap.Offset += firstOff
1585		if err := fn(bap); err != nil {
1586			return err
1587		}
1588	}
1589	return nil
1590}
1591
1592// deleteZipPack deletes the zip pack file br, but only if that zip
1593// file's parts are deleted already from the meta index.
1594func (s *storage) deleteZipPack(ctx context.Context, br blob.Ref) error {
1595	inUse, err := s.zipPartsInUse(ctx, br)
1596	if err != nil {
1597		return err
1598	}
1599	if len(inUse) > 0 {
1600		return fmt.Errorf("can't delete zip pack %v: %d parts in use: %v", br, len(inUse), inUse)
1601	}
1602	if err := s.large.RemoveBlobs(ctx, []blob.Ref{br}); err != nil {
1603		return err
1604	}
1605	return s.meta.Delete("d:" + br.String())
1606}
1607
1608func (s *storage) zipPartsInUse(ctx context.Context, br blob.Ref) ([]blob.Ref, error) {
1609	var (
1610		mu    sync.Mutex
1611		inUse []blob.Ref
1612	)
1613	var grp syncutil.Group
1614	gate := syncutil.NewGate(20) // arbitrary constant
1615	err := s.foreachZipBlob(ctx, br, func(bap BlobAndPos) error {
1616		gate.Start()
1617		grp.Go(func() error {
1618			defer gate.Done()
1619			mr, err := s.getMetaRow(bap.Ref)
1620			if err != nil {
1621				return err
1622			}
1623			if mr.isPacked() {
1624				mu.Lock()
1625				inUse = append(inUse, mr.largeRef)
1626				mu.Unlock()
1627			}
1628			return nil
1629		})
1630		return nil
1631	})
1632	if os.IsNotExist(err) {
1633		// An already-deleted blob from large isn't considered
1634		// to be in-use.
1635		return nil, nil
1636	}
1637	if err != nil {
1638		return nil, err
1639	}
1640	if err := grp.Err(); err != nil {
1641		return nil, err
1642	}
1643	return inUse, nil
1644}
1645
1646// A BlobAndPos is a blobref, its size, and where it is located within
1647// a larger group of bytes.
1648type BlobAndPos struct {
1649	blob.SizedRef
1650	Offset int64 `json:"offset"`
1651}
1652
1653// Manifest is the JSON description type representing the
1654// "camlistore/camlistore-pack-manifest.json" file found in a blobpack
1655// zip file.
1656type Manifest struct {
1657	// WholeRef is the blobref of the entire file that this zip is
1658	// either fully or partially describing.  For files under
1659	// around 16MB, the WholeRef and DataBlobsOrigin will be
1660	// the same.
1661	WholeRef blob.Ref `json:"wholeRef"`
1662
1663	// WholeSize is the number of bytes in the original file being
1664	// cut up.
1665	WholeSize int64 `json:"wholeSize"`
1666
1667	// WholePartIndex is the chunk number (0-based) of this zip file.
1668	// If a client has 'n' zip files with the same WholeRef whose
1669	// WholePartIndexes are contiguous (including 0) and the sum of
1670	// the DataBlobs equals WholeSize, the client has the entire
1671	// original file.
1672	WholePartIndex int `json:"wholePartIndex"`
1673
1674	// DataBlobsOrigin is the blobref of the contents of the first
1675	// file in the zip pack file. This first file is the actual data,
1676	// or a part of it, that the rest of this zip is describing or
1677	// referencing.
1678	DataBlobsOrigin blob.Ref `json:"dataBlobsOrigin"`
1679
1680	// DataBlobs describes all the logical blobs that are
1681	// concatenated together in the first file in the zip file.
1682	// The offsets are relative to the beginning of that first
1683	// file, not the beginning of the zip file itself.
1684	DataBlobs []BlobAndPos `json:"dataBlobs"`
1685}
1686
1687// approxSerializedSize reports how big this Manifest will be
1688// (approximately), once encoded as JSON. This is used as a hint by
1689// the packer to decide when to keep trying to add blobs. If this
1690// number is too low, the packer backs up (at a slight performance
1691// cost) but is still correct. If this approximation returns too large
1692// of a number, it just causes multiple zip files to be created when
1693// the original blobs might've just barely fit.
1694func (mf *Manifest) approxSerializedSize() int {
1695	// Empirically (for sha1-* blobrefs) it's 204 bytes fixed
1696	// encoding overhead (pre-compression), and 119 bytes per
1697	// encoded DataBlob.
1698	// And empirically, it compresses down to 30% of its size with flate.
1699	// So use the sha1 numbers but conseratively assume only 50% compression,
1700	// to make up for longer sha-3 blobrefs.
1701	return (204 + len(mf.DataBlobs)*119) / 2
1702}
1703
1704type countWriter struct {
1705	w io.Writer
1706	n int64
1707}
1708
1709func (cw *countWriter) Write(p []byte) (n int, err error) {
1710	n, err = cw.w.Write(p)
1711	cw.n += int64(n)
1712	return
1713}
1714