1package checker 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "os" 8 "sync" 9 10 "github.com/restic/restic/internal/debug" 11 "github.com/restic/restic/internal/errors" 12 "github.com/restic/restic/internal/pack" 13 "github.com/restic/restic/internal/repository" 14 "github.com/restic/restic/internal/restic" 15 "github.com/restic/restic/internal/ui/progress" 16 "golang.org/x/sync/errgroup" 17) 18 19// Checker runs various checks on a repository. It is advisable to create an 20// exclusive Lock in the repository before running any checks. 21// 22// A Checker only tests for internal errors within the data structures of the 23// repository (e.g. missing blobs), and needs a valid Repository to work on. 24type Checker struct { 25 packs map[restic.ID]int64 26 blobRefs struct { 27 sync.Mutex 28 M restic.BlobSet 29 } 30 trackUnused bool 31 32 masterIndex *repository.MasterIndex 33 34 repo restic.Repository 35} 36 37// New returns a new checker which runs on repo. 38func New(repo restic.Repository, trackUnused bool) *Checker { 39 c := &Checker{ 40 packs: make(map[restic.ID]int64), 41 masterIndex: repository.NewMasterIndex(), 42 repo: repo, 43 trackUnused: trackUnused, 44 } 45 46 c.blobRefs.M = restic.NewBlobSet() 47 48 return c 49} 50 51const defaultParallelism = 5 52 53// ErrDuplicatePacks is returned when a pack is found in more than one index. 54type ErrDuplicatePacks struct { 55 PackID restic.ID 56 Indexes restic.IDSet 57} 58 59func (e ErrDuplicatePacks) Error() string { 60 return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID.Str(), e.Indexes) 61} 62 63// ErrOldIndexFormat is returned when an index with the old format is 64// found. 65type ErrOldIndexFormat struct { 66 restic.ID 67} 68 69func (err ErrOldIndexFormat) Error() string { 70 return fmt.Sprintf("index %v has old format", err.ID.Str()) 71} 72 73// LoadIndex loads all index files. 74func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { 75 debug.Log("Start") 76 77 packToIndex := make(map[restic.ID]restic.IDSet) 78 err := repository.ForAllIndexes(ctx, c.repo, func(id restic.ID, index *repository.Index, oldFormat bool, err error) error { 79 debug.Log("process index %v, err %v", id, err) 80 81 if oldFormat { 82 debug.Log("index %v has old format", id.Str()) 83 hints = append(hints, ErrOldIndexFormat{id}) 84 } 85 86 err = errors.Wrapf(err, "error loading index %v", id.Str()) 87 88 if err != nil { 89 errs = append(errs, err) 90 return nil 91 } 92 93 c.masterIndex.Insert(index) 94 95 debug.Log("process blobs") 96 cnt := 0 97 for blob := range index.Each(ctx) { 98 cnt++ 99 100 if _, ok := packToIndex[blob.PackID]; !ok { 101 packToIndex[blob.PackID] = restic.NewIDSet() 102 } 103 packToIndex[blob.PackID].Insert(id) 104 } 105 106 debug.Log("%d blobs processed", cnt) 107 return nil 108 }) 109 if err != nil { 110 errs = append(errs, err) 111 } 112 113 // Merge index before computing pack sizes, as this needs removed duplicates 114 err = c.masterIndex.MergeFinalIndexes() 115 if err != nil { 116 // abort if an error occurs merging the indexes 117 return hints, append(errs, err) 118 } 119 120 // compute pack size using index entries 121 c.packs = c.masterIndex.PackSize(ctx, false) 122 123 debug.Log("checking for duplicate packs") 124 for packID := range c.packs { 125 debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID])) 126 if len(packToIndex[packID]) > 1 { 127 hints = append(hints, ErrDuplicatePacks{ 128 PackID: packID, 129 Indexes: packToIndex[packID], 130 }) 131 } 132 } 133 134 err = c.repo.SetIndex(c.masterIndex) 135 if err != nil { 136 debug.Log("SetIndex returned error: %v", err) 137 errs = append(errs, err) 138 } 139 140 return hints, errs 141} 142 143// PackError describes an error with a specific pack. 144type PackError struct { 145 ID restic.ID 146 Orphaned bool 147 Err error 148} 149 150func (e PackError) Error() string { 151 return "pack " + e.ID.Str() + ": " + e.Err.Error() 152} 153 154// IsOrphanedPack returns true if the error describes a pack which is not 155// contained in any index. 156func IsOrphanedPack(err error) bool { 157 if e, ok := errors.Cause(err).(PackError); ok && e.Orphaned { 158 return true 159 } 160 161 return false 162} 163 164// Packs checks that all packs referenced in the index are still available and 165// there are no packs that aren't in an index. errChan is closed after all 166// packs have been checked. 167func (c *Checker) Packs(ctx context.Context, errChan chan<- error) { 168 defer close(errChan) 169 170 debug.Log("checking for %d packs", len(c.packs)) 171 172 debug.Log("listing repository packs") 173 repoPacks := make(map[restic.ID]int64) 174 175 err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { 176 repoPacks[id] = size 177 return nil 178 }) 179 180 if err != nil { 181 errChan <- err 182 } 183 184 for id, size := range c.packs { 185 reposize, ok := repoPacks[id] 186 // remove from repoPacks so we can find orphaned packs 187 delete(repoPacks, id) 188 189 // missing: present in c.packs but not in the repo 190 if !ok { 191 select { 192 case <-ctx.Done(): 193 return 194 case errChan <- PackError{ID: id, Err: errors.New("does not exist")}: 195 } 196 continue 197 } 198 199 // size not matching: present in c.packs and in the repo, but sizes do not match 200 if size != reposize { 201 select { 202 case <-ctx.Done(): 203 return 204 case errChan <- PackError{ID: id, Err: errors.Errorf("unexpected file size: got %d, expected %d", reposize, size)}: 205 } 206 } 207 } 208 209 // orphaned: present in the repo but not in c.packs 210 for orphanID := range repoPacks { 211 select { 212 case <-ctx.Done(): 213 return 214 case errChan <- PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}: 215 } 216 } 217} 218 219// Error is an error that occurred while checking a repository. 220type Error struct { 221 TreeID restic.ID 222 BlobID restic.ID 223 Err error 224} 225 226func (e Error) Error() string { 227 if !e.BlobID.IsNull() && !e.TreeID.IsNull() { 228 msg := "tree " + e.TreeID.Str() 229 msg += ", blob " + e.BlobID.Str() 230 msg += ": " + e.Err.Error() 231 return msg 232 } 233 234 if !e.TreeID.IsNull() { 235 return "tree " + e.TreeID.Str() + ": " + e.Err.Error() 236 } 237 238 return e.Err.Error() 239} 240 241// TreeError collects several errors that occurred while processing a tree. 242type TreeError struct { 243 ID restic.ID 244 Errors []error 245} 246 247func (e TreeError) Error() string { 248 return fmt.Sprintf("tree %v: %v", e.ID.Str(), e.Errors) 249} 250 251// checkTreeWorker checks the trees received and sends out errors to errChan. 252func (c *Checker) checkTreeWorker(ctx context.Context, trees <-chan restic.TreeItem, out chan<- error) { 253 for job := range trees { 254 debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.Error) 255 256 var errs []error 257 if job.Error != nil { 258 errs = append(errs, job.Error) 259 } else { 260 errs = c.checkTree(job.ID, job.Tree) 261 } 262 263 if len(errs) == 0 { 264 continue 265 } 266 treeError := TreeError{ID: job.ID, Errors: errs} 267 select { 268 case <-ctx.Done(): 269 return 270 case out <- treeError: 271 debug.Log("tree %v: sent %d errors", treeError.ID, len(treeError.Errors)) 272 } 273 } 274} 275 276func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (ids restic.IDs, errs []error) { 277 err := restic.ForAllSnapshots(ctx, repo, nil, func(id restic.ID, sn *restic.Snapshot, err error) error { 278 if err != nil { 279 errs = append(errs, err) 280 return nil 281 } 282 treeID := *sn.Tree 283 debug.Log("snapshot %v has tree %v", id, treeID) 284 ids = append(ids, treeID) 285 return nil 286 }) 287 if err != nil { 288 errs = append(errs, err) 289 } 290 291 return ids, errs 292} 293 294// Structure checks that for all snapshots all referenced data blobs and 295// subtrees are available in the index. errChan is closed after all trees have 296// been traversed. 297func (c *Checker) Structure(ctx context.Context, p *progress.Counter, errChan chan<- error) { 298 trees, errs := loadSnapshotTreeIDs(ctx, c.repo) 299 p.SetMax(uint64(len(trees))) 300 debug.Log("need to check %d trees from snapshots, %d errs returned", len(trees), len(errs)) 301 302 for _, err := range errs { 303 select { 304 case <-ctx.Done(): 305 return 306 case errChan <- err: 307 } 308 } 309 310 wg, ctx := errgroup.WithContext(ctx) 311 treeStream := restic.StreamTrees(ctx, wg, c.repo, trees, func(treeID restic.ID) bool { 312 // blobRefs may be accessed in parallel by checkTree 313 c.blobRefs.Lock() 314 h := restic.BlobHandle{ID: treeID, Type: restic.TreeBlob} 315 blobReferenced := c.blobRefs.M.Has(h) 316 // noop if already referenced 317 c.blobRefs.M.Insert(h) 318 c.blobRefs.Unlock() 319 return blobReferenced 320 }, p) 321 322 defer close(errChan) 323 for i := 0; i < defaultParallelism; i++ { 324 wg.Go(func() error { 325 c.checkTreeWorker(ctx, treeStream, errChan) 326 return nil 327 }) 328 } 329 330 // the wait group should not return an error because no worker returns an 331 // error, so panic if that has changed somehow. 332 err := wg.Wait() 333 if err != nil { 334 panic(err) 335 } 336} 337 338func (c *Checker) checkTree(id restic.ID, tree *restic.Tree) (errs []error) { 339 debug.Log("checking tree %v", id) 340 341 for _, node := range tree.Nodes { 342 switch node.Type { 343 case "file": 344 if node.Content == nil { 345 errs = append(errs, Error{TreeID: id, Err: errors.Errorf("file %q has nil blob list", node.Name)}) 346 } 347 348 for b, blobID := range node.Content { 349 if blobID.IsNull() { 350 errs = append(errs, Error{TreeID: id, Err: errors.Errorf("file %q blob %d has null ID", node.Name, b)}) 351 continue 352 } 353 // Note that we do not use the blob size. The "obvious" check 354 // whether the sum of the blob sizes matches the file size 355 // unfortunately fails in some cases that are not resolveable 356 // by users, so we omit this check, see #1887 357 358 _, found := c.repo.LookupBlobSize(blobID, restic.DataBlob) 359 if !found { 360 debug.Log("tree %v references blob %v which isn't contained in index", id, blobID) 361 errs = append(errs, Error{TreeID: id, Err: errors.Errorf("file %q blob %v not found in index", node.Name, blobID)}) 362 } 363 } 364 365 if c.trackUnused { 366 // loop a second time to keep the locked section as short as possible 367 c.blobRefs.Lock() 368 for _, blobID := range node.Content { 369 if blobID.IsNull() { 370 continue 371 } 372 h := restic.BlobHandle{ID: blobID, Type: restic.DataBlob} 373 c.blobRefs.M.Insert(h) 374 debug.Log("blob %v is referenced", blobID) 375 } 376 c.blobRefs.Unlock() 377 } 378 379 case "dir": 380 if node.Subtree == nil { 381 errs = append(errs, Error{TreeID: id, Err: errors.Errorf("dir node %q has no subtree", node.Name)}) 382 continue 383 } 384 385 if node.Subtree.IsNull() { 386 errs = append(errs, Error{TreeID: id, Err: errors.Errorf("dir node %q subtree id is null", node.Name)}) 387 continue 388 } 389 390 case "symlink", "socket", "chardev", "dev", "fifo": 391 // nothing to check 392 393 default: 394 errs = append(errs, Error{TreeID: id, Err: errors.Errorf("node %q with invalid type %q", node.Name, node.Type)}) 395 } 396 397 if node.Name == "" { 398 errs = append(errs, Error{TreeID: id, Err: errors.New("node with empty name")}) 399 } 400 } 401 402 return errs 403} 404 405// UnusedBlobs returns all blobs that have never been referenced. 406func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) { 407 if !c.trackUnused { 408 panic("only works when tracking blob references") 409 } 410 c.blobRefs.Lock() 411 defer c.blobRefs.Unlock() 412 413 debug.Log("checking %d blobs", len(c.blobRefs.M)) 414 ctx, cancel := context.WithCancel(ctx) 415 defer cancel() 416 417 for blob := range c.repo.Index().Each(ctx) { 418 h := restic.BlobHandle{ID: blob.ID, Type: blob.Type} 419 if !c.blobRefs.M.Has(h) { 420 debug.Log("blob %v not referenced", h) 421 blobs = append(blobs, h) 422 } 423 } 424 425 return blobs 426} 427 428// CountPacks returns the number of packs in the repository. 429func (c *Checker) CountPacks() uint64 { 430 return uint64(len(c.packs)) 431} 432 433// GetPacks returns IDSet of packs in the repository 434func (c *Checker) GetPacks() map[restic.ID]int64 { 435 return c.packs 436} 437 438// checkPack reads a pack and checks the integrity of all blobs. 439func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int64) error { 440 debug.Log("checking pack %v", id) 441 h := restic.Handle{Type: restic.PackFile, Name: id.String()} 442 443 packfile, hash, realSize, err := repository.DownloadAndHash(ctx, r.Backend(), h) 444 if err != nil { 445 return errors.Wrap(err, "checkPack") 446 } 447 448 defer func() { 449 _ = packfile.Close() 450 _ = os.Remove(packfile.Name()) 451 }() 452 453 debug.Log("hash for pack %v is %v", id, hash) 454 455 if !hash.Equal(id) { 456 debug.Log("Pack ID does not match, want %v, got %v", id, hash) 457 return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str()) 458 } 459 460 if realSize != size { 461 debug.Log("Pack size does not match, want %v, got %v", size, realSize) 462 return errors.Errorf("Pack size does not match, want %v, got %v", size, realSize) 463 } 464 465 blobs, hdrSize, err := pack.List(r.Key(), packfile, size) 466 if err != nil { 467 return err 468 } 469 470 var errs []error 471 var buf []byte 472 sizeFromBlobs := uint(hdrSize) 473 idx := r.Index() 474 for i, blob := range blobs { 475 sizeFromBlobs += blob.Length 476 debug.Log(" check blob %d: %v", i, blob) 477 478 buf = buf[:cap(buf)] 479 if uint(len(buf)) < blob.Length { 480 buf = make([]byte, blob.Length) 481 } 482 buf = buf[:blob.Length] 483 484 _, err := packfile.Seek(int64(blob.Offset), 0) 485 if err != nil { 486 return errors.Errorf("Seek(%v): %v", blob.Offset, err) 487 } 488 489 _, err = io.ReadFull(packfile, buf) 490 if err != nil { 491 debug.Log(" error loading blob %v: %v", blob.ID, err) 492 errs = append(errs, errors.Errorf("blob %v: %v", i, err)) 493 continue 494 } 495 496 nonce, ciphertext := buf[:r.Key().NonceSize()], buf[r.Key().NonceSize():] 497 plaintext, err := r.Key().Open(ciphertext[:0], nonce, ciphertext, nil) 498 if err != nil { 499 debug.Log(" error decrypting blob %v: %v", blob.ID, err) 500 errs = append(errs, errors.Errorf("blob %v: %v", i, err)) 501 continue 502 } 503 504 hash := restic.Hash(plaintext) 505 if !hash.Equal(blob.ID) { 506 debug.Log(" Blob ID does not match, want %v, got %v", blob.ID, hash) 507 errs = append(errs, errors.Errorf("Blob ID does not match, want %v, got %v", blob.ID.Str(), hash.Str())) 508 continue 509 } 510 511 // Check if blob is contained in index and position is correct 512 idxHas := false 513 for _, pb := range idx.Lookup(blob.BlobHandle) { 514 if pb.PackID == id && pb.Offset == blob.Offset && pb.Length == blob.Length { 515 idxHas = true 516 break 517 } 518 } 519 if !idxHas { 520 errs = append(errs, errors.Errorf("Blob %v is not contained in index or position is incorrect", blob.ID.Str())) 521 continue 522 } 523 } 524 525 if int64(sizeFromBlobs) != size { 526 debug.Log("Pack size does not match, want %v, got %v", size, sizeFromBlobs) 527 errs = append(errs, errors.Errorf("Pack size does not match, want %v, got %v", size, sizeFromBlobs)) 528 } 529 530 if len(errs) > 0 { 531 return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs) 532 } 533 534 return nil 535} 536 537// ReadData loads all data from the repository and checks the integrity. 538func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) { 539 c.ReadPacks(ctx, c.packs, nil, errChan) 540} 541 542// ReadPacks loads data from specified packs and checks the integrity. 543func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) { 544 defer close(errChan) 545 546 g, ctx := errgroup.WithContext(ctx) 547 type packsize struct { 548 id restic.ID 549 size int64 550 } 551 ch := make(chan packsize) 552 553 // run workers 554 for i := 0; i < defaultParallelism; i++ { 555 g.Go(func() error { 556 for { 557 var ps packsize 558 var ok bool 559 560 select { 561 case <-ctx.Done(): 562 return nil 563 case ps, ok = <-ch: 564 if !ok { 565 return nil 566 } 567 } 568 err := checkPack(ctx, c.repo, ps.id, ps.size) 569 p.Add(1) 570 if err == nil { 571 continue 572 } 573 574 select { 575 case <-ctx.Done(): 576 return nil 577 case errChan <- err: 578 } 579 } 580 }) 581 } 582 583 // push packs to ch 584 for pack, size := range packs { 585 select { 586 case ch <- packsize{id: pack, size: size}: 587 case <-ctx.Done(): 588 } 589 } 590 close(ch) 591 592 err := g.Wait() 593 if err != nil { 594 select { 595 case <-ctx.Done(): 596 return 597 case errChan <- err: 598 } 599 } 600} 601