1/* 2Copyright 2014 The Perkeep AUTHORS 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17/* 18Package blobpacked registers the "blobpacked" blobserver storage type, 19storing blobs initially as one physical blob per logical blob, but then 20rearranging little physical blobs into large contiguous blobs organized by 21how they'll likely be accessed. An index tracks the mapping from logical to 22physical blobs. 23 24Example low-level config: 25 26 "/storage/": { 27 "handler": "storage-blobpacked", 28 "handlerArgs": { 29 "smallBlobs": "/small/", 30 "largeBlobs": "/large/", 31 "metaIndex": { 32 "type": "mysql", 33 ..... 34 } 35 } 36 } 37 38The resulting large blobs are valid zip files. Those blobs may up be up to 3916 MB and contain the original contiguous file (or fractions of it), as well 40as metadata about how the file is cut up. The zip file will have the 41following structure: 42 43 foo.jpg (or whatever) 44 camlistore/sha1-beb1df0b75952c7d277905ad14de71ef7ef90c44.json (some file ref) 45 camlistore/sha1-a0ceb10b04403c9cc1d032e07a9071db5e711c9a.json (some bytes ref) 46 camlistore/sha1-7b4d9c8529c27d592255c6dfb17188493db96ccc.json (another bytes ref) 47 camlistore/camlistore-pack-manifest.json 48 49The camlistore-pack-manifest.json is documented on the exported 50Manifest type. It looks like this: 51 52 { 53 "wholeRef": "sha1-0e64816d731a56915e8bb4ae4d0ac7485c0b84da", 54 "wholeSize": 2962227200, // 2.8GB; so will require ~176-180 16MB chunks 55 "wholePartIndex": 17, // 0-based 56 "dataBlobsOrigin": "sha1-355705cf62a56669303d2561f29e0620a676c36e", 57 "dataBlobs": [ 58 {"blob": "sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15", "offset": 0, "size": 273048}, 59 {"blob": "sha1-e242ed3bffccdf271b7fbaf34ed72d089537b42f", "offset": 273048, "size": 112783}, 60 {"blob": "sha1-6eadeac2dade6347e87c0d24fd455feffa7069f0", "offset": 385831, ...}, 61 {"blob": "sha1-9425cca1dde5d8b6eb70cd087db4e356da92396e", "offset": ...}, 62 {"blob": "sha1-7709559a3c8668c57cc0a2f57c418b1cc3598049", "offset": ...}, 63 {"blob": "sha1-f62cb5d05cfbf2a7a6c7f8339d0a4bf1dcd0ab6c", "offset": ...} 64 ] // raw data blobs of foo.jpg 65 } 66 67The manifest.json ensures that if the metadata index is lost, all the 68data can be reconstructed from the raw zip files. 69 70The 'wholeRef' property specifies which large file that this zip is building 71up. If the file is less than 15.5 MB or so (leaving room for the zip 72overhead and manifest size), it will probably all be in one zip and the 73first file in the zip will be the whole thing. Otherwise it'll be cut across 74multiple zip files, each no larger than 16MB. In that case, each part of the 75file will have a different 'wholePartIndex' number, starting at index 760. Each will have the same 'wholeSize'. 77*/ 78package blobpacked // import "perkeep.org/pkg/blobserver/blobpacked" 79 80// TODO: BlobStreamer using the zip manifests, for recovery. 81 82import ( 83 "archive/zip" 84 "bytes" 85 "context" 86 "crypto/sha1" 87 "encoding/json" 88 "errors" 89 "fmt" 90 "io" 91 "log" 92 "os" 93 "runtime" 94 "sort" 95 "strconv" 96 "strings" 97 "sync" 98 "time" 99 100 "perkeep.org/internal/pools" 101 "perkeep.org/pkg/blob" 102 "perkeep.org/pkg/blobserver" 103 "perkeep.org/pkg/constants" 104 "perkeep.org/pkg/env" 105 "perkeep.org/pkg/schema" 106 "perkeep.org/pkg/sorted" 107 108 "go4.org/jsonconfig" 109 "go4.org/strutil" 110 "go4.org/syncutil" 111) 112 113// TODO: evaluate whether this should even be 0, to keep the schema blobs together at least. 114// Files under this size aren't packed. 115const packThreshold = 512 << 10 116 117// Overhead for zip files. 118// These are only variables so they can be changed by tests, but 119// they're effectively constant. 120var ( 121 zipFixedOverhead = 20 /*directory64EndLen*/ + 122 56 /*directory64LocLen */ + 123 22 /*directoryEndLen*/ + 124 512 /* conservative slop space, to get us away from 16 MB zip boundary */ 125 zipPerEntryOverhead = 30 /*fileHeaderLen*/ + 126 24 /*dataDescriptor64Len*/ + 127 22 /*directoryEndLen*/ + 128 len("camlistore/sha1-f1d2d2f924e986ac86fdf7b36c94bcdf32beec15.dat")*3/2 /*padding for larger blobrefs*/ 129) 130 131// meta key prefixes 132const ( 133 blobMetaPrefix = "b:" 134 blobMetaPrefixLimit = "b;" 135 136 wholeMetaPrefix = "w:" 137 wholeMetaPrefixLimit = "w;" 138 139 zipMetaPrefix = "z:" 140 zipMetaPrefixLimit = "z;" 141) 142 143const ( 144 zipManifestPath = "camlistore/camlistore-pack-manifest.json" 145) 146 147// RecoveryMode is the mode in which the blobpacked server starts. 148type RecoveryMode int 149 150// Note: not using iota for these, because they're stored in GCE 151// instance's metadata values. 152const ( 153 // NoRecovery means blobpacked does not attempt to repair its index on startup. 154 // It is the default. 155 NoRecovery RecoveryMode = 0 156 // FastRecovery populates the blobpacked index, without erasing any existing one. 157 FastRecovery RecoveryMode = 1 158 // FullRecovery erases the existing blobpacked index, then rebuilds it. 159 FullRecovery RecoveryMode = 2 160) 161 162var ( 163 recoveryMu sync.Mutex 164 recovery = NoRecovery 165) 166 167// TODO(mpl): make SetRecovery a method of type storage if we ever export it. 168 169// SetRecovery sets the recovery mode for the blobpacked package. 170// If set to one of the modes other than NoRecovery, it means that any 171// blobpacked storage subsequently initialized will automatically start with 172// rebuilding its meta index of zip files, in accordance with the selected mode. 173func SetRecovery(mode RecoveryMode) { 174 recoveryMu.Lock() 175 defer recoveryMu.Unlock() 176 recovery = mode 177} 178 179type subFetcherStorage interface { 180 blobserver.Storage 181 blob.SubFetcher 182} 183 184// TODO(mpl): all a logf method or something to storage so we get the 185// "blobpacked:" prefix automatically to log messages. 186 187type storage struct { 188 small blobserver.Storage 189 large subFetcherStorage 190 191 // meta key -> value rows are: 192 // 193 // For logical blobs packed within a large blob, "b:" prefix: 194 // b:sha1-xxxx -> "<size> <big-blobref> <offset_u32>" 195 // 196 // For wholerefs: (wholeMetaPrefix) 197 // w:sha1-xxxx(wholeref) -> "<nbytes_total_u64> <nchunks_u32>" 198 // Then for each big nchunk of the file: 199 // The wholeRef and the chunk number as a key to: the blobRef of the zip 200 // file, the position of the data within the zip, the position of the data 201 // within the uploaded whole file, the length of data in this zip. 202 // w:sha1-xxxx:0 -> "<zipchunk-blobref> <offset-in-zipchunk-blobref> <offset-in-whole_u64> <length_u32>" 203 // w:sha1-xxxx:... 204 // w:sha1-xxxx:(nchunks-1) 205 // 206 // For zipRefs: (zipMetaPrefix) 207 // key: blobref of the zip, prefixed by "z:" 208 // value: size of the zip, blobref of the contents of the whole file (which may 209 // span multiple zips, ~15.5 MB of data per zip), size of the whole file, position 210 // in the whole file of the data (first file) in the zip, size of the data in the 211 // zip (== size of the zip's first file). 212 // z:<zip-blobref> -> "<zip_size_u32> <whole_ref_from_zip_manifest> <whole_size_u64> 213 // <zip_data_offset_in_whole_u64> <zip_data_bytes_u32>" 214 // 215 // For marking that zips that have blobs (possibly all) 216 // deleted from inside them: (deleted zip) 217 // d:sha1-xxxxxx -> <unix-time-of-delete> 218 meta sorted.KeyValue 219 220 // If non-zero, the maximum size of a zip blob. 221 // It defaults to constants.MaxBlobSize. 222 forceMaxZipBlobSize int 223 224 skipDelete bool // don't delete from small after packing 225 226 packGate *syncutil.Gate 227 228 loggerOnce sync.Once 229 log *log.Logger // nil means default 230} 231 232var ( 233 _ blobserver.BlobStreamer = (*storage)(nil) 234 _ blobserver.Generationer = (*storage)(nil) 235 _ blobserver.WholeRefFetcher = (*storage)(nil) 236) 237 238func (s *storage) String() string { 239 return fmt.Sprintf("\"blobpacked\" storage") 240} 241 242func (s *storage) Logf(format string, args ...interface{}) { 243 s.logger().Printf(format, args...) 244} 245 246func (s *storage) logger() *log.Logger { 247 s.loggerOnce.Do(s.initLogger) 248 return s.log 249} 250 251func (s *storage) initLogger() { 252 if s.log == nil { 253 s.log = log.New(os.Stderr, "blobpacked: ", log.LstdFlags) 254 } 255} 256 257func (s *storage) init() { 258 s.packGate = syncutil.NewGate(10) 259} 260 261func (s *storage) maxZipBlobSize() int { 262 if s.forceMaxZipBlobSize > 0 { 263 return s.forceMaxZipBlobSize 264 } 265 return constants.MaxBlobSize 266} 267 268func init() { 269 blobserver.RegisterStorageConstructor("blobpacked", blobserver.StorageConstructor(newFromConfig)) 270} 271 272func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storage, error) { 273 var ( 274 smallPrefix = conf.RequiredString("smallBlobs") 275 largePrefix = conf.RequiredString("largeBlobs") 276 metaConf = conf.RequiredObject("metaIndex") 277 keepGoing = conf.OptionalBool("keepGoing", false) 278 ) 279 if err := conf.Validate(); err != nil { 280 return nil, err 281 } 282 small, err := ld.GetStorage(smallPrefix) 283 if err != nil { 284 return nil, fmt.Errorf("failed to load smallBlobs at %s: %v", smallPrefix, err) 285 } 286 large, err := ld.GetStorage(largePrefix) 287 if err != nil { 288 return nil, fmt.Errorf("failed to load largeBlobs at %s: %v", largePrefix, err) 289 } 290 largeSubber, ok := large.(subFetcherStorage) 291 if !ok { 292 return nil, fmt.Errorf("largeBlobs at %q of type %T doesn't support fetching sub-ranges of blobs", 293 largePrefix, large) 294 } 295 meta, err := sorted.NewKeyValueMaybeWipe(metaConf) 296 if err != nil { 297 return nil, fmt.Errorf("failed to setup blobpacked metaIndex: %v", err) 298 } 299 sto := &storage{ 300 small: small, 301 large: largeSubber, 302 meta: meta, 303 } 304 sto.init() 305 306 recoveryMu.Lock() 307 defer recoveryMu.Unlock() 308 condFatalf := func(pattern string, args ...interface{}) { 309 log.Printf(pattern, args...) 310 if !keepGoing { 311 os.Exit(1) 312 } 313 } 314 315 var newKv func() (sorted.KeyValue, error) 316 switch recovery { 317 case FastRecovery: 318 newKv = func() (sorted.KeyValue, error) { 319 return sorted.NewKeyValue(metaConf) 320 } 321 case FullRecovery: 322 newKv = func() (sorted.KeyValue, error) { 323 kv, err := sorted.NewKeyValue(metaConf) 324 if err != nil { 325 return nil, err 326 } 327 wiper, ok := kv.(sorted.Wiper) 328 if !ok { 329 return nil, fmt.Errorf("blobpacked meta index of type %T needs to be wiped, but does not support automatic wiping. It should be removed manually.", kv) 330 } 331 if err := wiper.Wipe(); err != nil { 332 return nil, fmt.Errorf("blobpacked meta index of type %T could not be wiped: %v", kv, err) 333 } 334 return kv, nil 335 } 336 } 337 if newKv != nil { 338 // i.e. we're in one of the recovery modes 339 log.Print("Starting recovery of blobpacked index") 340 if err := meta.Close(); err != nil { 341 return nil, err 342 } 343 if err := sto.reindex(context.TODO(), newKv); err != nil { 344 return nil, err 345 } 346 if _, err := sto.checkLargeIntegrity(); err != nil { 347 condFatalf("blobpacked: reindexed successfully, but error after validation: %v", err) 348 } 349 return sto, nil 350 } 351 352 // Check for a weird state: zip files exist, but no metadata about them 353 // is recorded. This is probably a corrupt state, and the user likely 354 // wants to recover. 355 if !sto.anyMeta() && sto.anyZipPacks() { 356 if env.OnGCE() { 357 // TODO(mpl): make web UI page/mode that informs about this error. 358 condFatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", FastRecovery) 359 } 360 condFatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please re-start in recovery mode with -recovery=%d", FastRecovery) 361 } 362 363 if mode, err := sto.checkLargeIntegrity(); err != nil { 364 if mode <= NoRecovery { 365 condFatalf("%v", err) 366 } 367 if env.OnGCE() { 368 // TODO(mpl): make web UI page/mode that informs about this error. 369 condFatalf("Error: %v. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", err, mode) 370 } 371 condFatalf("Error: %v. Please re-start in recovery mode with -recovery=%d", err, mode) 372 } 373 374 return sto, nil 375} 376 377// checkLargeIntegrity verifies that all large blobs in the large storage are 378// indexed in meta, and vice-versa, that all rows in meta referring to a large blob 379// correspond to an existing large blob in the large storage. If any of the above 380// is not true, it returns the recovery mode that should be used to fix the 381// problem, as well as the error detailing the problem. It does not perform any 382// check about the contents of the large blobs themselves. 383func (s *storage) checkLargeIntegrity() (RecoveryMode, error) { 384 inLarge := 0 385 var missing []blob.Ref // blobs in large but not in meta 386 var extra []blob.Ref // blobs in meta but not in large 387 t := s.meta.Find(zipMetaPrefix, zipMetaPrefixLimit) 388 defer t.Close() 389 iterate := true 390 var enumFunc func(sb blob.SizedRef) error 391 enumFunc = func(sb blob.SizedRef) error { 392 if iterate && !t.Next() { 393 // all of the yet to be enumerated are missing from meta 394 missing = append(missing, sb.Ref) 395 return nil 396 } 397 iterate = true 398 wantMetaKey := zipMetaPrefix + sb.Ref.String() 399 metaKey := t.Key() 400 if metaKey != wantMetaKey { 401 if metaKey > wantMetaKey { 402 // zipRef missing from meta 403 missing = append(missing, sb.Ref) 404 iterate = false 405 return nil 406 } 407 // zipRef in meta that actually does not exist in s.large. 408 xbr, ok := blob.Parse(strings.TrimPrefix(metaKey, zipMetaPrefix)) 409 if !ok { 410 return fmt.Errorf("boggus key in z: row: %q", metaKey) 411 } 412 extra = append(extra, xbr) 413 // iterate meta once more at the same storage enumeration point 414 return enumFunc(sb) 415 } 416 if _, err := parseZipMetaRow(t.ValueBytes()); err != nil { 417 return fmt.Errorf("error parsing row from meta: %v", err) 418 } 419 inLarge++ 420 return nil 421 } 422 log.Printf("blobpacked: checking integrity of packed blobs against index...") 423 if err := blobserver.EnumerateAllFrom(context.Background(), s.large, "", enumFunc); err != nil { 424 return FullRecovery, err 425 } 426 log.Printf("blobpacked: %d large blobs found in index, %d missing from index", inLarge, len(missing)) 427 if len(missing) > 0 { 428 printSample(missing, "missing") 429 } 430 if len(extra) > 0 { 431 printSample(extra, "extra") 432 return FullRecovery, fmt.Errorf("%d large blobs in index but not actually in storage", len(extra)) 433 } 434 if err := t.Close(); err != nil { 435 return FullRecovery, fmt.Errorf("error reading or closing index: %v", err) 436 } 437 if len(missing) > 0 { 438 return FastRecovery, fmt.Errorf("%d large blobs missing from index", len(missing)) 439 } 440 return NoRecovery, nil 441} 442 443func printSample(fromSlice []blob.Ref, sliceName string) { 444 sort.Slice(fromSlice, func(i, j int) bool { return fromSlice[i].Less(fromSlice[j]) }) 445 for i, br := range fromSlice { 446 if i == 10 { 447 break 448 } 449 log.Printf(" sample %v large blob: %v", sliceName, br) 450 } 451} 452 453// zipMetaInfo is the info needed to write the wholeMetaPrefix and 454// zipMetaPrefix entries when reindexing. For a given file, spread over several 455// zips, each zip has a corresponding zipMetaInfo. The wholeMetaPrefix and 456// zipMetaPrefix rows pertaining to a file can only be written once all the 457// zipMetaInfo have been collected and sorted, because the offset of each zip's 458// data is derived from the size of the other pieces that precede it in the file. 459type zipMetaInfo struct { 460 wholePartIndex int // index of that zip, 0-based 461 zipRef blob.Ref // ref of the zip file holding packed data blobs + other schema blobs 462 zipSize uint32 // size of the zipped file 463 offsetInZip uint32 // position of the contiguous data blobs, relative to the zip 464 dataSize uint32 // size of the data in the zip 465 wholeSize uint64 // size of the whole file that this zip is a part of 466 wholeRef blob.Ref // ref of the contents of the whole file 467} 468 469// rowValue returns the value of the "z:<zipref>" meta key row 470// based on the contents of zm and the provided arguments. 471func (zm zipMetaInfo) rowValue(offset uint64) string { 472 return fmt.Sprintf("%d %v %d %d %d", zm.zipSize, zm.wholeRef, zm.wholeSize, offset, zm.dataSize) 473} 474 475// TODO(mpl): add client command to call reindex on an "offline" blobpacked. camtool packblobs -reindex maybe? 476 477// fileName returns the name of the (possibly partial) first file in zipRef 478// (i.e. the actual data). It returns a zipOpenError if there was any problem 479// reading the zip, and os.ErrNotExist if the zip could not be fetched or if 480// there was no file in the zip. 481func (s *storage) fileName(ctx context.Context, zipRef blob.Ref) (string, error) { 482 _, size, err := s.large.Fetch(ctx, zipRef) 483 if err != nil { 484 return "", err 485 } 486 zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(size)) 487 if err != nil { 488 return "", zipOpenError{zipRef, err} 489 } 490 for _, f := range zr.File { 491 return f.Name, nil 492 } 493 return "", os.ErrNotExist 494} 495 496// reindex rebuilds the meta index for packed blobs. It calls newMeta to create 497// a new KeyValue on which to write the index, and replaces s.meta with it. There 498// is no locking whatsoever so it should not be called when the storage is already 499// in use. its signature might change if/when it gets exported. 500func (s *storage) reindex(ctx context.Context, newMeta func() (sorted.KeyValue, error)) error { 501 meta, err := newMeta() 502 if err != nil { 503 return fmt.Errorf("failed to create new blobpacked meta index: %v", err) 504 } 505 506 zipMetaByWholeRef := make(map[blob.Ref][]zipMetaInfo) 507 508 // first a fast full enumerate, so we can report progress afterwards 509 packedTotal := 0 510 blobserver.EnumerateAllFrom(ctx, s.large, "", func(sb blob.SizedRef) error { 511 packedTotal++ 512 return nil 513 }) 514 515 var packedDone, packedSeen int 516 t := time.NewTicker(10 * time.Second) 517 defer t.Stop() 518 if err := blobserver.EnumerateAllFrom(ctx, s.large, "", func(sb blob.SizedRef) error { 519 select { 520 case <-t.C: 521 log.Printf("blobpacked: %d / %d zip packs seen", packedSeen, packedTotal) 522 default: 523 } 524 zipRef := sb.Ref 525 zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(sb.Size)) 526 if err != nil { 527 return zipOpenError{zipRef, err} 528 } 529 var maniFile *zip.File 530 var firstOff int64 // offset of first file (the packed data chunks) 531 for i, f := range zr.File { 532 if i == 0 { 533 firstOff, err = f.DataOffset() 534 if err != nil { 535 return err 536 } 537 } 538 if f.Name == zipManifestPath { 539 maniFile = f 540 break 541 } 542 } 543 if maniFile == nil { 544 return fmt.Errorf("no perkeep manifest file found in zip %v", zipRef) 545 } 546 maniRC, err := maniFile.Open() 547 if err != nil { 548 return err 549 } 550 defer maniRC.Close() 551 var mf Manifest 552 if err := json.NewDecoder(maniRC).Decode(&mf); err != nil { 553 return err 554 } 555 if !mf.WholeRef.Valid() || mf.WholeSize == 0 || !mf.DataBlobsOrigin.Valid() { 556 return fmt.Errorf("incomplete blobpack manifest JSON in %v", zipRef) 557 } 558 559 bm := meta.BeginBatch() 560 // In this loop, we write all the blobMetaPrefix entries for the 561 // data blobs in this zip, and we also compute the dataBytesWritten, for later. 562 var dataBytesWritten int64 563 for _, bp := range mf.DataBlobs { 564 bm.Set(blobMetaPrefix+bp.SizedRef.Ref.String(), fmt.Sprintf("%d %v %d", bp.SizedRef.Size, zipRef, firstOff+bp.Offset)) 565 dataBytesWritten += int64(bp.SizedRef.Size) 566 } 567 if dataBytesWritten > (1<<32 - 1) { 568 return fmt.Errorf("total data blobs size in zip %v overflows uint32", zipRef) 569 } 570 dataSize := uint32(dataBytesWritten) 571 572 // In this loop, we write all the blobMetaPrefix entries for the schema blobs in this zip 573 for _, f := range zr.File { 574 if !(strings.HasPrefix(f.Name, "camlistore/") && strings.HasSuffix(f.Name, ".json")) || 575 f.Name == zipManifestPath { 576 continue 577 } 578 br, ok := blob.Parse(strings.TrimSuffix(strings.TrimPrefix(f.Name, "camlistore/"), ".json")) 579 if !ok { 580 return fmt.Errorf("schema file in zip %v does not have blobRef as name: %v", zipRef, f.Name) 581 } 582 offset, err := f.DataOffset() 583 if err != nil { 584 return err 585 } 586 bm.Set(blobMetaPrefix+br.String(), fmt.Sprintf("%d %v %d", f.UncompressedSize64, zipRef, offset)) 587 } 588 589 if err := meta.CommitBatch(bm); err != nil { 590 return err 591 } 592 593 // record that info for later, when we got them all, so we can write the wholeMetaPrefix entries. 594 zipMetaByWholeRef[mf.WholeRef] = append(zipMetaByWholeRef[mf.WholeRef], zipMetaInfo{ 595 wholePartIndex: mf.WholePartIndex, 596 zipRef: zipRef, 597 zipSize: sb.Size, 598 offsetInZip: uint32(firstOff), 599 dataSize: dataSize, 600 wholeSize: uint64(mf.WholeSize), 601 wholeRef: mf.WholeRef, // redundant with zipMetaByWholeRef key for now. 602 }) 603 packedSeen++ 604 return nil 605 }); err != nil { 606 return err 607 } 608 609 // finally, write the wholeMetaPrefix entries 610 foundDups := false 611 packedFiles := 0 612 tt := time.NewTicker(2 * time.Second) 613 defer tt.Stop() 614 bm := meta.BeginBatch() 615 for wholeRef, zipMetas := range zipMetaByWholeRef { 616 select { 617 case <-t.C: 618 log.Printf("blobpacked: %d files reindexed", packedFiles) 619 default: 620 } 621 sort.Slice(zipMetas, func(i, j int) bool { return zipMetas[i].wholePartIndex < zipMetas[j].wholePartIndex }) 622 hasDup := hasDups(zipMetas) 623 if hasDup { 624 foundDups = true 625 } 626 offsets := wholeOffsets(zipMetas) 627 for _, z := range zipMetas { 628 offset := offsets[z.wholePartIndex] 629 // write the w:row 630 bm.Set(fmt.Sprintf("%s%s:%d", wholeMetaPrefix, wholeRef, z.wholePartIndex), 631 fmt.Sprintf("%s %d %d %d", z.zipRef, z.offsetInZip, offset, z.dataSize)) 632 // write the z: row 633 bm.Set(fmt.Sprintf("%s%v", zipMetaPrefix, z.zipRef), z.rowValue(offset)) 634 packedDone++ 635 } 636 if hasDup { 637 if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); debug { 638 printDuplicates(zipMetas) 639 } 640 } 641 642 wholeBytesWritten := offsets[len(offsets)-1] 643 if zipMetas[0].wholeSize != wholeBytesWritten { 644 // Any corrupted zip should have been found earlier, so this error means we're 645 // missing at least one full zip for the whole file to be complete. 646 fileName, err := s.fileName(ctx, zipMetas[0].zipRef) 647 if err != nil { 648 return fmt.Errorf("could not get filename of file in zip %v: %v", zipMetas[0].zipRef, err) 649 } 650 log.Printf( 651 "blobpacked: file %q (wholeRef %v) is incomplete: sum of all zips (%d bytes) does not match manifest's WholeSize (%d bytes)", 652 fileName, wholeRef, wholeBytesWritten, zipMetas[0].wholeSize) 653 var allParts []blob.Ref 654 for _, z := range zipMetas { 655 allParts = append(allParts, z.zipRef) 656 } 657 log.Printf("blobpacked: known parts of %v: %v", wholeRef, allParts) 658 // we skip writing the w: row for the full file, and we don't count the file 659 // as complete. 660 continue 661 } 662 bm.Set(fmt.Sprintf("%s%s", wholeMetaPrefix, wholeRef), 663 fmt.Sprintf("%d %d", wholeBytesWritten, zipMetas[len(zipMetas)-1].wholePartIndex+1)) 664 packedFiles++ 665 } 666 if err := meta.CommitBatch(bm); err != nil { 667 return err 668 } 669 670 log.Printf("blobpacked: %d / %d zip packs successfully reindexed", packedDone, packedTotal) 671 if packedFiles < len(zipMetaByWholeRef) { 672 log.Printf("blobpacked: %d files reindexed, and %d incomplete file(s) found.", packedFiles, len(zipMetaByWholeRef)-packedFiles) 673 } else { 674 log.Printf("blobpacked: %d files reindexed.", packedFiles) 675 } 676 if foundDups { 677 if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); !debug { 678 log.Print("blobpacked: zip blobs with duplicate contents were found. Re-run with CAMLI_DEBUG=true for more detail.") 679 } 680 } 681 682 // TODO(mpl): take into account removed blobs. I can't be done for now 683 // (2015-01-29) because RemoveBlobs currently only updates the meta index. 684 // So if the index was lost, all information about removals was lost too. 685 686 s.meta = meta 687 return nil 688} 689 690// hasDups reports whether zm contains successive zipRefs which have the same 691// wholePartIndex, which we assume means they have the same data contents. It 692// panics if that assumption seems wrong, i.e. if the data within assumed 693// duplicates is not the same size in all of them. zm must be sorted by 694// wholePartIndex. 695// See https://github.com/perkeep/perkeep/issues/1079 696func hasDups(zm []zipMetaInfo) bool { 697 i := 0 698 var dataSize uint32 699 var firstDup blob.Ref 700 dupFound := false 701 for _, z := range zm { 702 if z.wholePartIndex == i { 703 firstDup = z.zipRef 704 dataSize = z.dataSize 705 i++ 706 continue 707 } 708 // we could return true right now, but we want to go through it all, to make 709 // sure our assumption that "same part index -> duplicate" is true, using at least 710 // the dataSize to confirm. For a better effort, we should use DataBlobsOrigin. 711 if z.dataSize != dataSize { 712 panic(fmt.Sprintf("%v and %v looked like duplicates at first, but don't actually have the same dataSize. TODO: add DataBlobsOrigin checking.", firstDup, z.zipRef)) 713 } 714 dupFound = true 715 } 716 return dupFound 717} 718 719// wholeOffsets returns the offset for each part of a file f, in order, assuming 720// zm are all the (wholePartIndex) ordered zip parts that constitute that file. If 721// zm seems to contain duplicates, they are skipped. The additional last item of 722// the returned slice is the sum of all the parts, i.e. the whole size of f. 723func wholeOffsets(zm []zipMetaInfo) []uint64 { 724 i := 0 725 var offsets []uint64 726 var currentOffset uint64 727 for _, z := range zm { 728 if i != z.wholePartIndex { 729 continue 730 } 731 offsets = append(offsets, currentOffset) 732 currentOffset += uint64(z.dataSize) 733 i++ 734 } 735 // add the last computed offset to the slice, as it's useful info too: it's the 736 // size of all the data in the zip. 737 offsets = append(offsets, currentOffset) 738 return offsets 739} 740 741func printDuplicates(zm []zipMetaInfo) { 742 i := 0 743 byPartIndex := make(map[int][]zipMetaInfo) 744 for _, z := range zm { 745 if i == z.wholePartIndex { 746 byPartIndex[z.wholePartIndex] = []zipMetaInfo{z} 747 i++ 748 continue 749 } 750 byPartIndex[z.wholePartIndex] = append(byPartIndex[z.wholePartIndex], z) 751 } 752 for _, zm := range byPartIndex { 753 if len(zm) <= 1 { 754 continue 755 } 756 br := make([]blob.Ref, 0, len(zm)) 757 for _, z := range zm { 758 br = append(br, z.zipRef) 759 } 760 log.Printf("zip blobs with same data contents: %v", br) 761 } 762} 763 764func (s *storage) anyMeta() (v bool) { 765 // TODO: we only care about getting 1 row, but the 766 // sorted.KeyValue interface doesn't let us give it that 767 // hint. Care? 768 sorted.Foreach(s.meta, func(_, _ string) error { 769 v = true 770 return errors.New("stop") 771 }) 772 return 773} 774 775func (s *storage) anyZipPacks() (v bool) { 776 ctx, cancel := context.WithCancel(context.TODO()) 777 defer cancel() 778 dest := make(chan blob.SizedRef, 1) 779 if err := s.large.EnumerateBlobs(ctx, dest, "", 1); err != nil { 780 // Not a great interface in general, but only needed 781 // by the start-up check for now, where it doesn't 782 // really matter. 783 return false 784 } 785 _, ok := <-dest 786 return ok 787} 788 789func (s *storage) Close() error { 790 return s.meta.Close() 791} 792 793func (s *storage) StorageGeneration() (initTime time.Time, random string, err error) { 794 sgen, sok := s.small.(blobserver.Generationer) 795 lgen, lok := s.large.(blobserver.Generationer) 796 if !sok || !lok { 797 return time.Time{}, "", blobserver.GenerationNotSupportedError("underlying storage engines don't support Generationer") 798 } 799 st, srand, err := sgen.StorageGeneration() 800 if err != nil { 801 return 802 } 803 lt, lrand, err := lgen.StorageGeneration() 804 if err != nil { 805 return 806 } 807 hash := sha1.New() 808 io.WriteString(hash, srand) 809 io.WriteString(hash, lrand) 810 maxTime := func(a, b time.Time) time.Time { 811 if a.After(b) { 812 return a 813 } 814 return b 815 } 816 return maxTime(lt, st), fmt.Sprintf("%x", hash.Sum(nil)), nil 817} 818 819func (s *storage) ResetStorageGeneration() error { 820 var retErr error 821 for _, st := range []blobserver.Storage{s.small, s.large} { 822 if g, ok := st.(blobserver.Generationer); ok { 823 if err := g.ResetStorageGeneration(); err != nil { 824 retErr = err 825 } 826 } 827 } 828 return retErr 829} 830 831type meta struct { 832 exists bool 833 size uint32 834 largeRef blob.Ref // if invalid, then on small if exists 835 largeOff uint32 836} 837 838func (m *meta) isPacked() bool { return m.largeRef.Valid() } 839 840// if not found, err == nil. 841func (s *storage) getMetaRow(br blob.Ref) (meta, error) { 842 v, err := s.meta.Get(blobMetaPrefix + br.String()) 843 if err == sorted.ErrNotFound { 844 return meta{}, nil 845 } 846 if err != nil { 847 return meta{}, fmt.Errorf("blobpacked.getMetaRow(%v) = %v", br, err) 848 } 849 return parseMetaRow([]byte(v)) 850} 851 852var singleSpace = []byte{' '} 853 854// parses: 855// "<size_u32> <big-blobref> <big-offset>" 856func parseMetaRow(v []byte) (m meta, err error) { 857 row := v 858 sp := bytes.IndexByte(v, ' ') 859 if sp < 1 || sp == len(v)-1 { 860 return meta{}, fmt.Errorf("invalid metarow %q", v) 861 } 862 m.exists = true 863 size, err := strutil.ParseUintBytes(v[:sp], 10, 32) 864 if err != nil { 865 return meta{}, fmt.Errorf("invalid metarow size %q", v) 866 } 867 m.size = uint32(size) 868 v = v[sp+1:] 869 870 // remains: "<big-blobref> <big-offset>" 871 if bytes.Count(v, singleSpace) != 1 { 872 return meta{}, fmt.Errorf("invalid metarow %q: wrong number of spaces", row) 873 } 874 sp = bytes.IndexByte(v, ' ') 875 largeRef, ok := blob.ParseBytes(v[:sp]) 876 if !ok { 877 return meta{}, fmt.Errorf("invalid metarow %q: bad blobref %q", row, v[:sp]) 878 } 879 m.largeRef = largeRef 880 off, err := strutil.ParseUintBytes(v[sp+1:], 10, 32) 881 if err != nil { 882 return meta{}, fmt.Errorf("invalid metarow %q: bad offset: %v", row, err) 883 } 884 m.largeOff = uint32(off) 885 return m, nil 886} 887 888func parseMetaRowSizeOnly(v []byte) (size uint32, err error) { 889 sp := bytes.IndexByte(v, ' ') 890 if sp < 1 || sp == len(v)-1 { 891 return 0, fmt.Errorf("invalid metarow %q", v) 892 } 893 size64, err := strutil.ParseUintBytes(v[:sp], 10, 32) 894 if err != nil { 895 return 0, fmt.Errorf("invalid metarow size %q", v) 896 } 897 return uint32(size64), nil 898} 899 900// parses: 901// "<zip_size_u32> <whole_ref_from_zip_manifest> <whole_size_u64> <zip_data_offset_in_whole_u64> <zip_data_bytes_u32>" 902func parseZipMetaRow(v []byte) (m zipMetaInfo, err error) { 903 row := v 904 sp := bytes.IndexByte(v, ' ') 905 if sp < 1 || sp == len(v)-1 { 906 return zipMetaInfo{}, fmt.Errorf("invalid z: meta row %q", row) 907 } 908 if bytes.Count(v, singleSpace) != 4 { 909 return zipMetaInfo{}, fmt.Errorf("wrong number of spaces in z: meta row %q", row) 910 } 911 zipSize, err := strutil.ParseUintBytes(v[:sp], 10, 32) 912 if err != nil { 913 return zipMetaInfo{}, fmt.Errorf("invalid zipSize %q in z: meta row: %q", v[:sp], row) 914 } 915 m.zipSize = uint32(zipSize) 916 917 v = v[sp+1:] 918 sp = bytes.IndexByte(v, ' ') 919 wholeRef, ok := blob.ParseBytes(v[:sp]) 920 if !ok { 921 return zipMetaInfo{}, fmt.Errorf("invalid wholeRef %q in z: meta row: %q", v[:sp], row) 922 } 923 m.wholeRef = wholeRef 924 925 v = v[sp+1:] 926 sp = bytes.IndexByte(v, ' ') 927 wholeSize, err := strutil.ParseUintBytes(v[:sp], 10, 64) 928 if err != nil { 929 return zipMetaInfo{}, fmt.Errorf("invalid wholeSize %q in z: meta row: %q", v[:sp], row) 930 } 931 m.wholeSize = uint64(wholeSize) 932 933 v = v[sp+1:] 934 sp = bytes.IndexByte(v, ' ') 935 if _, err := strutil.ParseUintBytes(v[:sp], 10, 64); err != nil { 936 return zipMetaInfo{}, fmt.Errorf("invalid offset %q in z: meta row: %q", v[:sp], row) 937 } 938 939 v = v[sp+1:] 940 dataSize, err := strutil.ParseUintBytes(v, 10, 32) 941 if err != nil { 942 return zipMetaInfo{}, fmt.Errorf("invalid dataSize %q in z: meta row: %q", v, row) 943 } 944 m.dataSize = uint32(dataSize) 945 946 return m, nil 947} 948 949func (s *storage) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (sb blob.SizedRef, err error) { 950 buf := pools.BytesBuffer() 951 defer pools.PutBuffer(buf) 952 953 if _, err := io.Copy(buf, source); err != nil { 954 return sb, err 955 } 956 size := uint32(buf.Len()) 957 isFile := false 958 fileBlob, err := schema.BlobFromReader(br, bytes.NewReader(buf.Bytes())) 959 if err == nil && fileBlob.Type() == "file" { 960 isFile = true 961 } 962 meta, err := s.getMetaRow(br) 963 if err != nil { 964 return sb, err 965 } 966 if meta.exists { 967 sb = blob.SizedRef{Size: size, Ref: br} 968 } else { 969 sb, err = s.small.ReceiveBlob(ctx, br, buf) 970 if err != nil { 971 return sb, err 972 } 973 } 974 if !isFile || meta.isPacked() || fileBlob.PartsSize() < packThreshold { 975 return sb, nil 976 } 977 978 // Pack the blob. 979 s.packGate.Start() 980 defer s.packGate.Done() 981 // We ignore the return value from packFile since we can't 982 // really recover. At least be happy that we have all the 983 // data on 'small' already. packFile will log at least. 984 s.packFile(ctx, br) 985 return sb, nil 986} 987 988func (s *storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) { 989 m, err := s.getMetaRow(br) 990 if err != nil { 991 return nil, 0, err 992 } 993 if !m.exists || !m.isPacked() { 994 return s.small.Fetch(ctx, br) 995 } 996 rc, err := s.large.SubFetch(ctx, m.largeRef, int64(m.largeOff), int64(m.size)) 997 if err != nil { 998 return nil, 0, err 999 } 1000 return rc, m.size, nil 1001} 1002 1003const removeLookups = 50 // arbitrary 1004 1005func (s *storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error { 1006 // Plan: 1007 // -- delete from small (if it's there) 1008 // -- if in big, update the meta index to note that it's there, but deleted. 1009 // -- fetch big's zip file (constructed from a ReaderAt that is all dummy zeros + 1010 // the zip's TOC only, relying on big being a SubFetcher, and keeping info in 1011 // the meta about the offset of the TOC+total size of each big's zip) 1012 // -- iterate over the zip's blobs (at some point). If all are marked deleted, actually RemoveBlob 1013 // on big to delete the full zip and then delete all the meta rows. 1014 var ( 1015 mu sync.Mutex 1016 unpacked []blob.Ref 1017 packed []blob.Ref 1018 large = map[blob.Ref]bool{} // the large blobs that packed are in 1019 ) 1020 var grp syncutil.Group 1021 delGate := syncutil.NewGate(removeLookups) 1022 for _, br := range blobs { 1023 br := br 1024 delGate.Start() 1025 grp.Go(func() error { 1026 defer delGate.Done() 1027 m, err := s.getMetaRow(br) 1028 if err != nil { 1029 return err 1030 } 1031 mu.Lock() 1032 defer mu.Unlock() 1033 if m.isPacked() { 1034 packed = append(packed, br) 1035 large[m.largeRef] = true 1036 } else { 1037 unpacked = append(unpacked, br) 1038 } 1039 return nil 1040 }) 1041 } 1042 if err := grp.Err(); err != nil { 1043 return err 1044 } 1045 if len(unpacked) > 0 { 1046 grp.Go(func() error { 1047 return s.small.RemoveBlobs(ctx, unpacked) 1048 }) 1049 } 1050 if len(packed) > 0 { 1051 grp.Go(func() error { 1052 bm := s.meta.BeginBatch() 1053 now := time.Now() 1054 for zipRef := range large { 1055 bm.Set("d:"+zipRef.String(), fmt.Sprint(now.Unix())) 1056 } 1057 for _, br := range packed { 1058 bm.Delete("b:" + br.String()) 1059 } 1060 return s.meta.CommitBatch(bm) 1061 }) 1062 } 1063 return grp.Err() 1064} 1065 1066var statGate = syncutil.NewGate(50) // arbitrary 1067 1068func (s *storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error { 1069 var ( 1070 trySmallMu sync.Mutex 1071 trySmall []blob.Ref 1072 ) 1073 1074 err := blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(br blob.Ref) (sb blob.SizedRef, err error) { 1075 m, err := s.getMetaRow(br) 1076 if err != nil { 1077 return sb, err 1078 } 1079 if m.exists { 1080 return blob.SizedRef{Ref: br, Size: m.size}, nil 1081 } 1082 // Try it in round two against the small blobs: 1083 trySmallMu.Lock() 1084 trySmall = append(trySmall, br) 1085 trySmallMu.Unlock() 1086 return sb, nil 1087 }) 1088 if err != nil { 1089 return err 1090 } 1091 if len(trySmall) == 0 { 1092 return nil 1093 } 1094 return s.small.StatBlobs(ctx, trySmall, fn) 1095} 1096 1097func (s *storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) { 1098 return blobserver.MergedEnumerate(ctx, dest, []blobserver.BlobEnumerator{ 1099 s.small, 1100 enumerator{s}, 1101 }, after, limit) 1102} 1103 1104// enumerator implements EnumerateBlobs. 1105type enumerator struct { 1106 *storage 1107} 1108 1109func (s enumerator) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) { 1110 defer close(dest) 1111 t := s.meta.Find(blobMetaPrefix+after, blobMetaPrefixLimit) 1112 defer func() { 1113 closeErr := t.Close() 1114 if err == nil { 1115 err = closeErr 1116 } 1117 }() 1118 n := 0 1119 afterb := []byte(after) 1120 for n < limit && t.Next() { 1121 key := t.KeyBytes()[len(blobMetaPrefix):] 1122 if n == 0 && bytes.Equal(key, afterb) { 1123 continue 1124 } 1125 n++ 1126 br, ok := blob.ParseBytes(key) 1127 if !ok { 1128 return fmt.Errorf("unknown key %q in meta index", t.Key()) 1129 } 1130 size, err := parseMetaRowSizeOnly(t.ValueBytes()) 1131 if err != nil { 1132 return err 1133 } 1134 select { 1135 case <-ctx.Done(): 1136 return ctx.Err() 1137 case dest <- blob.SizedRef{Ref: br, Size: size}: 1138 } 1139 } 1140 return nil 1141} 1142 1143func (s *storage) packFile(ctx context.Context, fileRef blob.Ref) (err error) { 1144 s.Logf("Packing file %s ...", fileRef) 1145 defer func() { 1146 if err == nil { 1147 s.Logf("Packed file %s", fileRef) 1148 } else { 1149 s.Logf("Error packing file %s: %v", fileRef, err) 1150 } 1151 }() 1152 1153 fr, err := schema.NewFileReader(ctx, s, fileRef) 1154 if err != nil { 1155 return err 1156 } 1157 return newPacker(s, fileRef, fr).pack(ctx) 1158} 1159 1160func newPacker(s *storage, fileRef blob.Ref, fr *schema.FileReader) *packer { 1161 return &packer{ 1162 s: s, 1163 fileRef: fileRef, 1164 fr: fr, 1165 dataSize: map[blob.Ref]uint32{}, 1166 schemaBlob: map[blob.Ref]*blob.Blob{}, 1167 schemaParent: map[blob.Ref][]blob.Ref{}, 1168 } 1169} 1170 1171// A packer writes a file out 1172type packer struct { 1173 s *storage 1174 fileRef blob.Ref 1175 fr *schema.FileReader 1176 1177 wholeRef blob.Ref 1178 wholeSize int64 1179 1180 dataRefs []blob.Ref // in order 1181 dataSize map[blob.Ref]uint32 1182 1183 schemaRefs []blob.Ref // in order, but irrelevant 1184 schemaBlob map[blob.Ref]*blob.Blob 1185 schemaParent map[blob.Ref][]blob.Ref // data blob -> its parent/ancestor schema blob(s), all the way up to fileRef included 1186 1187 chunksRemain []blob.Ref 1188 zips []writtenZip 1189 wholeBytesWritten int64 // sum of zips.dataRefs.size 1190} 1191 1192type writtenZip struct { 1193 blob.SizedRef 1194 dataRefs []blob.Ref 1195} 1196 1197var ( 1198 testHookSawTruncate func(blob.Ref) 1199 testHookStopBeforeOverflowing func() 1200) 1201 1202func (pk *packer) pack(ctx context.Context) error { 1203 if err := pk.scanChunks(ctx); err != nil { 1204 return err 1205 } 1206 1207 // TODO: decide as a fuction of schemaRefs and dataRefs 1208 // already in s.large whether it makes sense to still compact 1209 // this from a savings standpoint. For now we just always do. 1210 // Maybe we'd have knobs in the future. Ideally not. 1211 1212 // Don't pack a file if we already have its wholeref stored 1213 // otherwise (perhaps under a different filename). But that 1214 // means we have to compute its wholeref first. We assume the 1215 // blob source will cache these lookups so it's not too 1216 // expensive to do two passes over the input. 1217 h := blob.NewHash() 1218 var err error 1219 pk.wholeSize, err = io.Copy(h, pk.fr) 1220 if err != nil { 1221 return err 1222 } 1223 pk.wholeRef = blob.RefFromHash(h) 1224 wholeKey := wholeMetaPrefix + pk.wholeRef.String() 1225 _, err = pk.s.meta.Get(wholeKey) 1226 if err == nil { 1227 // Nil error means there was some knowledge of this wholeref. 1228 return fmt.Errorf("already have wholeref %v packed; not packing again", pk.wholeRef) 1229 } else if err != sorted.ErrNotFound { 1230 return err 1231 } 1232 1233 pk.chunksRemain = pk.dataRefs 1234 var trunc blob.Ref 1235MakingZips: 1236 for len(pk.chunksRemain) > 0 { 1237 if err := pk.writeAZip(ctx, trunc); err != nil { 1238 if needTrunc, ok := err.(needsTruncatedAfterError); ok { 1239 trunc = needTrunc.Ref 1240 if fn := testHookSawTruncate; fn != nil { 1241 fn(trunc) 1242 } 1243 continue MakingZips 1244 } 1245 return err 1246 } 1247 trunc = blob.Ref{} 1248 } 1249 1250 // Record the final wholeMetaPrefix record: 1251 err = pk.s.meta.Set(wholeKey, fmt.Sprintf("%d %d", pk.wholeSize, len(pk.zips))) 1252 if err != nil { 1253 return fmt.Errorf("Error setting %s: %v", wholeKey, err) 1254 } 1255 1256 return nil 1257} 1258 1259func (pk *packer) scanChunks(ctx context.Context) error { 1260 schemaSeen := map[blob.Ref]bool{} 1261 return pk.fr.ForeachChunk(ctx, func(schemaPath []blob.Ref, p schema.BytesPart) error { 1262 if !p.BlobRef.Valid() { 1263 return errors.New("sparse files are not packed") 1264 } 1265 if p.Offset != 0 { 1266 // TODO: maybe care about this later, if we ever start making 1267 // these sorts of files. 1268 return errors.New("file uses complicated schema. not packing") 1269 } 1270 pk.schemaParent[p.BlobRef] = append([]blob.Ref(nil), schemaPath...) // clone it 1271 pk.dataSize[p.BlobRef] = uint32(p.Size) 1272 for _, schemaRef := range schemaPath { 1273 if schemaSeen[schemaRef] { 1274 continue 1275 } 1276 schemaSeen[schemaRef] = true 1277 pk.schemaRefs = append(pk.schemaRefs, schemaRef) 1278 if b, err := blob.FromFetcher(ctx, pk.s, schemaRef); err != nil { 1279 return err 1280 } else { 1281 pk.schemaBlob[schemaRef] = b 1282 } 1283 } 1284 pk.dataRefs = append(pk.dataRefs, p.BlobRef) 1285 return nil 1286 }) 1287} 1288 1289// needsTruncatedAfterError is returned by writeAZip if it failed in its estimation and the zip file 1290// was over the 16MB (or whatever) max blob size limit. In this case the caller tries again 1291type needsTruncatedAfterError struct{ blob.Ref } 1292 1293func (e needsTruncatedAfterError) Error() string { return "needs truncation after " + e.Ref.String() } 1294 1295// check should only be used for things which really shouldn't ever happen, but should 1296// still be checked. If there is interesting logic in the 'else', then don't use this. 1297func check(err error) { 1298 if err != nil { 1299 b := make([]byte, 2<<10) 1300 b = b[:runtime.Stack(b, false)] 1301 log.Printf("Unlikely error condition triggered: %v at %s", err, b) 1302 panic(err) 1303 } 1304} 1305 1306// trunc is a hint about which blob to truncate after. It may be zero. 1307// If the returned error is of type 'needsTruncatedAfterError', then 1308// the zip should be attempted to be written again, but truncating the 1309// data after the listed blob. 1310func (pk *packer) writeAZip(ctx context.Context, trunc blob.Ref) (err error) { 1311 defer func() { 1312 if e := recover(); e != nil { 1313 if v, ok := e.(error); ok && err == nil { 1314 err = v 1315 } else { 1316 panic(e) 1317 } 1318 } 1319 }() 1320 mf := Manifest{ 1321 WholeRef: pk.wholeRef, 1322 WholeSize: pk.wholeSize, 1323 WholePartIndex: len(pk.zips), 1324 } 1325 var zbuf bytes.Buffer 1326 cw := &countWriter{w: &zbuf} 1327 zw := zip.NewWriter(cw) 1328 1329 var approxSize = zipFixedOverhead // can't use zbuf.Len because zw buffers 1330 var dataRefsWritten []blob.Ref 1331 var dataBytesWritten int64 1332 var schemaBlobSeen = map[blob.Ref]bool{} 1333 var schemaBlobs []blob.Ref // to add after the main file 1334 1335 baseFileName := pk.fr.FileName() 1336 if strings.Contains(baseFileName, "/") || strings.Contains(baseFileName, "\\") { 1337 return fmt.Errorf("File schema blob %v filename had a slash in it: %q", pk.fr.SchemaBlobRef(), baseFileName) 1338 } 1339 fh := &zip.FileHeader{ 1340 Name: baseFileName, 1341 Method: zip.Store, // uncompressed 1342 } 1343 fh.SetModTime(pk.fr.ModTime()) 1344 fh.SetMode(0644) 1345 fw, err := zw.CreateHeader(fh) 1346 check(err) 1347 check(zw.Flush()) 1348 dataStart := cw.n 1349 approxSize += zipPerEntryOverhead // for the first FileHeader w/ the data 1350 1351 zipMax := pk.s.maxZipBlobSize() 1352 chunks := pk.chunksRemain 1353 chunkWholeHash := blob.NewHash() 1354 for len(chunks) > 0 { 1355 dr := chunks[0] // the next chunk to maybe write 1356 1357 if trunc.Valid() && trunc == dr { 1358 if approxSize == 0 { 1359 return errors.New("first blob is too large to pack, once you add the zip overhead") 1360 } 1361 break 1362 } 1363 1364 schemaBlobsSave := schemaBlobs 1365 for _, parent := range pk.schemaParent[dr] { 1366 if !schemaBlobSeen[parent] { 1367 schemaBlobSeen[parent] = true 1368 schemaBlobs = append(schemaBlobs, parent) 1369 approxSize += int(pk.schemaBlob[parent].Size()) + zipPerEntryOverhead 1370 } 1371 } 1372 1373 thisSize := pk.dataSize[dr] 1374 approxSize += int(thisSize) 1375 if approxSize+mf.approxSerializedSize() > zipMax { 1376 if fn := testHookStopBeforeOverflowing; fn != nil { 1377 fn() 1378 } 1379 schemaBlobs = schemaBlobsSave // restore it 1380 break 1381 } 1382 1383 // Copy the data to the zip. 1384 rc, size, err := pk.s.Fetch(ctx, dr) 1385 check(err) 1386 if size != thisSize { 1387 rc.Close() 1388 return errors.New("unexpected size") 1389 } 1390 if n, err := io.Copy(io.MultiWriter(fw, chunkWholeHash), rc); err != nil || n != int64(size) { 1391 rc.Close() 1392 return fmt.Errorf("copy to zip = %v, %v; want %v bytes", n, err, size) 1393 } 1394 rc.Close() 1395 1396 dataRefsWritten = append(dataRefsWritten, dr) 1397 dataBytesWritten += int64(size) 1398 chunks = chunks[1:] 1399 } 1400 mf.DataBlobsOrigin = blob.RefFromHash(chunkWholeHash) 1401 1402 // zipBlobs is where a schema or data blob is relative to the beginning 1403 // of the zip file. 1404 var zipBlobs []BlobAndPos 1405 1406 var dataOffset int64 1407 for _, br := range dataRefsWritten { 1408 size := pk.dataSize[br] 1409 mf.DataBlobs = append(mf.DataBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: size}, dataOffset}) 1410 1411 zipBlobs = append(zipBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: size}, dataStart + dataOffset}) 1412 dataOffset += int64(size) 1413 } 1414 1415 for _, br := range schemaBlobs { 1416 fw, err := zw.CreateHeader(&zip.FileHeader{ 1417 Name: "camlistore/" + br.String() + ".json", 1418 Method: zip.Store, // uncompressed 1419 }) 1420 check(err) 1421 check(zw.Flush()) 1422 b := pk.schemaBlob[br] 1423 zipBlobs = append(zipBlobs, BlobAndPos{blob.SizedRef{Ref: br, Size: b.Size()}, cw.n}) 1424 r, err := b.ReadAll(ctx) 1425 if err != nil { 1426 return err 1427 } 1428 n, err := io.Copy(fw, r) 1429 1430 check(err) 1431 if n != int64(b.Size()) { 1432 return fmt.Errorf("failed to write all of schema blob %v: %d bytes, not wanted %d", br, n, b.Size()) 1433 } 1434 } 1435 1436 // Manifest file 1437 fw, err = zw.Create(zipManifestPath) 1438 check(err) 1439 enc, err := json.MarshalIndent(mf, "", " ") 1440 check(err) 1441 _, err = fw.Write(enc) 1442 check(err) 1443 err = zw.Close() 1444 check(err) 1445 1446 if zbuf.Len() > zipMax { 1447 // We guessed wrong. Back up. Find out how many blobs we went over. 1448 overage := zbuf.Len() - zipMax 1449 for i := len(dataRefsWritten) - 1; i >= 0; i-- { 1450 dr := dataRefsWritten[i] 1451 if overage <= 0 { 1452 return needsTruncatedAfterError{dr} 1453 } 1454 overage -= int(pk.dataSize[dr]) 1455 } 1456 return errors.New("file is unpackable; first blob is too big to fit") 1457 } 1458 1459 zipRef := blob.RefFromBytes(zbuf.Bytes()) 1460 zipSB, err := blobserver.ReceiveNoHash(ctx, pk.s.large, zipRef, bytes.NewReader(zbuf.Bytes())) 1461 if err != nil { 1462 return err 1463 } 1464 1465 bm := pk.s.meta.BeginBatch() 1466 bm.Set(fmt.Sprintf("%s%s:%d", wholeMetaPrefix, pk.wholeRef, len(pk.zips)), 1467 fmt.Sprintf("%s %d %d %d", 1468 zipRef, 1469 dataStart, 1470 pk.wholeBytesWritten, 1471 dataBytesWritten)) 1472 bm.Set(fmt.Sprintf("%s%v", zipMetaPrefix, zipRef), 1473 fmt.Sprintf("%d %v %d %d %d", 1474 zipSB.Size, 1475 pk.wholeRef, 1476 pk.wholeSize, 1477 pk.wholeBytesWritten, 1478 dataBytesWritten)) 1479 1480 pk.wholeBytesWritten += dataBytesWritten 1481 pk.zips = append(pk.zips, writtenZip{ 1482 SizedRef: zipSB, 1483 dataRefs: dataRefsWritten, 1484 }) 1485 1486 for _, zb := range zipBlobs { 1487 bm.Set(blobMetaPrefix+zb.Ref.String(), fmt.Sprintf("%d %v %d", zb.Size, zipRef, zb.Offset)) 1488 } 1489 if err := pk.s.meta.CommitBatch(bm); err != nil { 1490 return err 1491 } 1492 1493 // Delete from small 1494 if !pk.s.skipDelete { 1495 toDelete := make([]blob.Ref, 0, len(dataRefsWritten)+len(schemaBlobs)) 1496 toDelete = append(toDelete, dataRefsWritten...) 1497 toDelete = append(toDelete, schemaBlobs...) 1498 if err := pk.s.small.RemoveBlobs(ctx, toDelete); err != nil { 1499 // Can't really do anything about it and doesn't really matter, so 1500 // just log for now. 1501 pk.s.Logf("Error removing blobs from %s: %v", pk.s.small, err) 1502 } 1503 } 1504 1505 // On success, consume the chunks we wrote from pk.chunksRemain. 1506 pk.chunksRemain = pk.chunksRemain[len(dataRefsWritten):] 1507 return nil 1508} 1509 1510type zipOpenError struct { 1511 zipRef blob.Ref 1512 err error 1513} 1514 1515func (ze zipOpenError) Error() string { 1516 return fmt.Sprintf("Error opening packed zip blob %v: %v", ze.zipRef, ze.err) 1517} 1518 1519// foreachZipBlob calls fn for each blob in the zip pack blob 1520// identified by zipRef. If fn returns a non-nil error, 1521// foreachZipBlob stops enumerating with that error. 1522func (s *storage) foreachZipBlob(ctx context.Context, zipRef blob.Ref, fn func(BlobAndPos) error) error { 1523 sb, err := blobserver.StatBlob(ctx, s.large, zipRef) 1524 if err != nil { 1525 return err 1526 } 1527 zr, err := zip.NewReader(blob.ReaderAt(ctx, s.large, zipRef), int64(sb.Size)) 1528 if err != nil { 1529 return zipOpenError{zipRef, err} 1530 } 1531 var maniFile *zip.File // or nil if not found 1532 var firstOff int64 // offset of first file (the packed data chunks) 1533 for i, f := range zr.File { 1534 if i == 0 { 1535 firstOff, err = f.DataOffset() 1536 if err != nil { 1537 return err 1538 } 1539 } 1540 if f.Name == zipManifestPath { 1541 maniFile = f 1542 break 1543 } 1544 } 1545 if maniFile == nil { 1546 return errors.New("no camlistore manifest file found in zip") 1547 } 1548 // apply fn to all the schema blobs 1549 for _, f := range zr.File { 1550 if !strings.HasPrefix(f.Name, "camlistore/") || f.Name == zipManifestPath || 1551 !strings.HasSuffix(f.Name, ".json") { 1552 continue 1553 } 1554 brStr := strings.TrimSuffix(strings.TrimPrefix(f.Name, "camlistore/"), ".json") 1555 br, ok := blob.Parse(brStr) 1556 if ok { 1557 off, err := f.DataOffset() 1558 if err != nil { 1559 return err 1560 } 1561 if err := fn(BlobAndPos{ 1562 SizedRef: blob.SizedRef{Ref: br, Size: uint32(f.UncompressedSize64)}, 1563 Offset: off, 1564 }); err != nil { 1565 return err 1566 } 1567 } 1568 } 1569 maniRC, err := maniFile.Open() 1570 if err != nil { 1571 return err 1572 } 1573 defer maniRC.Close() 1574 1575 var mf Manifest 1576 if err := json.NewDecoder(maniRC).Decode(&mf); err != nil { 1577 return err 1578 } 1579 if !mf.WholeRef.Valid() || mf.WholeSize == 0 || !mf.DataBlobsOrigin.Valid() { 1580 return errors.New("incomplete blobpack manifest JSON") 1581 } 1582 // apply fn to all the data blobs 1583 for _, bap := range mf.DataBlobs { 1584 bap.Offset += firstOff 1585 if err := fn(bap); err != nil { 1586 return err 1587 } 1588 } 1589 return nil 1590} 1591 1592// deleteZipPack deletes the zip pack file br, but only if that zip 1593// file's parts are deleted already from the meta index. 1594func (s *storage) deleteZipPack(ctx context.Context, br blob.Ref) error { 1595 inUse, err := s.zipPartsInUse(ctx, br) 1596 if err != nil { 1597 return err 1598 } 1599 if len(inUse) > 0 { 1600 return fmt.Errorf("can't delete zip pack %v: %d parts in use: %v", br, len(inUse), inUse) 1601 } 1602 if err := s.large.RemoveBlobs(ctx, []blob.Ref{br}); err != nil { 1603 return err 1604 } 1605 return s.meta.Delete("d:" + br.String()) 1606} 1607 1608func (s *storage) zipPartsInUse(ctx context.Context, br blob.Ref) ([]blob.Ref, error) { 1609 var ( 1610 mu sync.Mutex 1611 inUse []blob.Ref 1612 ) 1613 var grp syncutil.Group 1614 gate := syncutil.NewGate(20) // arbitrary constant 1615 err := s.foreachZipBlob(ctx, br, func(bap BlobAndPos) error { 1616 gate.Start() 1617 grp.Go(func() error { 1618 defer gate.Done() 1619 mr, err := s.getMetaRow(bap.Ref) 1620 if err != nil { 1621 return err 1622 } 1623 if mr.isPacked() { 1624 mu.Lock() 1625 inUse = append(inUse, mr.largeRef) 1626 mu.Unlock() 1627 } 1628 return nil 1629 }) 1630 return nil 1631 }) 1632 if os.IsNotExist(err) { 1633 // An already-deleted blob from large isn't considered 1634 // to be in-use. 1635 return nil, nil 1636 } 1637 if err != nil { 1638 return nil, err 1639 } 1640 if err := grp.Err(); err != nil { 1641 return nil, err 1642 } 1643 return inUse, nil 1644} 1645 1646// A BlobAndPos is a blobref, its size, and where it is located within 1647// a larger group of bytes. 1648type BlobAndPos struct { 1649 blob.SizedRef 1650 Offset int64 `json:"offset"` 1651} 1652 1653// Manifest is the JSON description type representing the 1654// "camlistore/camlistore-pack-manifest.json" file found in a blobpack 1655// zip file. 1656type Manifest struct { 1657 // WholeRef is the blobref of the entire file that this zip is 1658 // either fully or partially describing. For files under 1659 // around 16MB, the WholeRef and DataBlobsOrigin will be 1660 // the same. 1661 WholeRef blob.Ref `json:"wholeRef"` 1662 1663 // WholeSize is the number of bytes in the original file being 1664 // cut up. 1665 WholeSize int64 `json:"wholeSize"` 1666 1667 // WholePartIndex is the chunk number (0-based) of this zip file. 1668 // If a client has 'n' zip files with the same WholeRef whose 1669 // WholePartIndexes are contiguous (including 0) and the sum of 1670 // the DataBlobs equals WholeSize, the client has the entire 1671 // original file. 1672 WholePartIndex int `json:"wholePartIndex"` 1673 1674 // DataBlobsOrigin is the blobref of the contents of the first 1675 // file in the zip pack file. This first file is the actual data, 1676 // or a part of it, that the rest of this zip is describing or 1677 // referencing. 1678 DataBlobsOrigin blob.Ref `json:"dataBlobsOrigin"` 1679 1680 // DataBlobs describes all the logical blobs that are 1681 // concatenated together in the first file in the zip file. 1682 // The offsets are relative to the beginning of that first 1683 // file, not the beginning of the zip file itself. 1684 DataBlobs []BlobAndPos `json:"dataBlobs"` 1685} 1686 1687// approxSerializedSize reports how big this Manifest will be 1688// (approximately), once encoded as JSON. This is used as a hint by 1689// the packer to decide when to keep trying to add blobs. If this 1690// number is too low, the packer backs up (at a slight performance 1691// cost) but is still correct. If this approximation returns too large 1692// of a number, it just causes multiple zip files to be created when 1693// the original blobs might've just barely fit. 1694func (mf *Manifest) approxSerializedSize() int { 1695 // Empirically (for sha1-* blobrefs) it's 204 bytes fixed 1696 // encoding overhead (pre-compression), and 119 bytes per 1697 // encoded DataBlob. 1698 // And empirically, it compresses down to 30% of its size with flate. 1699 // So use the sha1 numbers but conseratively assume only 50% compression, 1700 // to make up for longer sha-3 blobrefs. 1701 return (204 + len(mf.DataBlobs)*119) / 2 1702} 1703 1704type countWriter struct { 1705 w io.Writer 1706 n int64 1707} 1708 1709func (cw *countWriter) Write(p []byte) (n int, err error) { 1710 n, err = cw.w.Write(p) 1711 cw.n += int64(n) 1712 return 1713} 1714