1package checker
2
3import (
4	"context"
5	"fmt"
6	"io"
7	"os"
8	"sync"
9
10	"github.com/restic/restic/internal/debug"
11	"github.com/restic/restic/internal/errors"
12	"github.com/restic/restic/internal/pack"
13	"github.com/restic/restic/internal/repository"
14	"github.com/restic/restic/internal/restic"
15	"github.com/restic/restic/internal/ui/progress"
16	"golang.org/x/sync/errgroup"
17)
18
19// Checker runs various checks on a repository. It is advisable to create an
20// exclusive Lock in the repository before running any checks.
21//
22// A Checker only tests for internal errors within the data structures of the
23// repository (e.g. missing blobs), and needs a valid Repository to work on.
24type Checker struct {
25	packs    map[restic.ID]int64
26	blobRefs struct {
27		sync.Mutex
28		M restic.BlobSet
29	}
30	trackUnused bool
31
32	masterIndex *repository.MasterIndex
33
34	repo restic.Repository
35}
36
37// New returns a new checker which runs on repo.
38func New(repo restic.Repository, trackUnused bool) *Checker {
39	c := &Checker{
40		packs:       make(map[restic.ID]int64),
41		masterIndex: repository.NewMasterIndex(),
42		repo:        repo,
43		trackUnused: trackUnused,
44	}
45
46	c.blobRefs.M = restic.NewBlobSet()
47
48	return c
49}
50
51const defaultParallelism = 5
52
53// ErrDuplicatePacks is returned when a pack is found in more than one index.
54type ErrDuplicatePacks struct {
55	PackID  restic.ID
56	Indexes restic.IDSet
57}
58
59func (e ErrDuplicatePacks) Error() string {
60	return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID.Str(), e.Indexes)
61}
62
63// ErrOldIndexFormat is returned when an index with the old format is
64// found.
65type ErrOldIndexFormat struct {
66	restic.ID
67}
68
69func (err ErrOldIndexFormat) Error() string {
70	return fmt.Sprintf("index %v has old format", err.ID.Str())
71}
72
73// LoadIndex loads all index files.
74func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
75	debug.Log("Start")
76
77	packToIndex := make(map[restic.ID]restic.IDSet)
78	err := repository.ForAllIndexes(ctx, c.repo, func(id restic.ID, index *repository.Index, oldFormat bool, err error) error {
79		debug.Log("process index %v, err %v", id, err)
80
81		if oldFormat {
82			debug.Log("index %v has old format", id.Str())
83			hints = append(hints, ErrOldIndexFormat{id})
84		}
85
86		err = errors.Wrapf(err, "error loading index %v", id.Str())
87
88		if err != nil {
89			errs = append(errs, err)
90			return nil
91		}
92
93		c.masterIndex.Insert(index)
94
95		debug.Log("process blobs")
96		cnt := 0
97		for blob := range index.Each(ctx) {
98			cnt++
99
100			if _, ok := packToIndex[blob.PackID]; !ok {
101				packToIndex[blob.PackID] = restic.NewIDSet()
102			}
103			packToIndex[blob.PackID].Insert(id)
104		}
105
106		debug.Log("%d blobs processed", cnt)
107		return nil
108	})
109	if err != nil {
110		errs = append(errs, err)
111	}
112
113	// Merge index before computing pack sizes, as this needs removed duplicates
114	err = c.masterIndex.MergeFinalIndexes()
115	if err != nil {
116		// abort if an error occurs merging the indexes
117		return hints, append(errs, err)
118	}
119
120	// compute pack size using index entries
121	c.packs = c.masterIndex.PackSize(ctx, false)
122
123	debug.Log("checking for duplicate packs")
124	for packID := range c.packs {
125		debug.Log("  check pack %v: contained in %d indexes", packID, len(packToIndex[packID]))
126		if len(packToIndex[packID]) > 1 {
127			hints = append(hints, ErrDuplicatePacks{
128				PackID:  packID,
129				Indexes: packToIndex[packID],
130			})
131		}
132	}
133
134	err = c.repo.SetIndex(c.masterIndex)
135	if err != nil {
136		debug.Log("SetIndex returned error: %v", err)
137		errs = append(errs, err)
138	}
139
140	return hints, errs
141}
142
143// PackError describes an error with a specific pack.
144type PackError struct {
145	ID       restic.ID
146	Orphaned bool
147	Err      error
148}
149
150func (e PackError) Error() string {
151	return "pack " + e.ID.Str() + ": " + e.Err.Error()
152}
153
154// IsOrphanedPack returns true if the error describes a pack which is not
155// contained in any index.
156func IsOrphanedPack(err error) bool {
157	if e, ok := errors.Cause(err).(PackError); ok && e.Orphaned {
158		return true
159	}
160
161	return false
162}
163
164// Packs checks that all packs referenced in the index are still available and
165// there are no packs that aren't in an index. errChan is closed after all
166// packs have been checked.
167func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
168	defer close(errChan)
169
170	debug.Log("checking for %d packs", len(c.packs))
171
172	debug.Log("listing repository packs")
173	repoPacks := make(map[restic.ID]int64)
174
175	err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
176		repoPacks[id] = size
177		return nil
178	})
179
180	if err != nil {
181		errChan <- err
182	}
183
184	for id, size := range c.packs {
185		reposize, ok := repoPacks[id]
186		// remove from repoPacks so we can find orphaned packs
187		delete(repoPacks, id)
188
189		// missing: present in c.packs but not in the repo
190		if !ok {
191			select {
192			case <-ctx.Done():
193				return
194			case errChan <- PackError{ID: id, Err: errors.New("does not exist")}:
195			}
196			continue
197		}
198
199		// size not matching: present in c.packs and in the repo, but sizes do not match
200		if size != reposize {
201			select {
202			case <-ctx.Done():
203				return
204			case errChan <- PackError{ID: id, Err: errors.Errorf("unexpected file size: got %d, expected %d", reposize, size)}:
205			}
206		}
207	}
208
209	// orphaned: present in the repo but not in c.packs
210	for orphanID := range repoPacks {
211		select {
212		case <-ctx.Done():
213			return
214		case errChan <- PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}:
215		}
216	}
217}
218
219// Error is an error that occurred while checking a repository.
220type Error struct {
221	TreeID restic.ID
222	BlobID restic.ID
223	Err    error
224}
225
226func (e Error) Error() string {
227	if !e.BlobID.IsNull() && !e.TreeID.IsNull() {
228		msg := "tree " + e.TreeID.Str()
229		msg += ", blob " + e.BlobID.Str()
230		msg += ": " + e.Err.Error()
231		return msg
232	}
233
234	if !e.TreeID.IsNull() {
235		return "tree " + e.TreeID.Str() + ": " + e.Err.Error()
236	}
237
238	return e.Err.Error()
239}
240
241// TreeError collects several errors that occurred while processing a tree.
242type TreeError struct {
243	ID     restic.ID
244	Errors []error
245}
246
247func (e TreeError) Error() string {
248	return fmt.Sprintf("tree %v: %v", e.ID.Str(), e.Errors)
249}
250
251// checkTreeWorker checks the trees received and sends out errors to errChan.
252func (c *Checker) checkTreeWorker(ctx context.Context, trees <-chan restic.TreeItem, out chan<- error) {
253	for job := range trees {
254		debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.Error)
255
256		var errs []error
257		if job.Error != nil {
258			errs = append(errs, job.Error)
259		} else {
260			errs = c.checkTree(job.ID, job.Tree)
261		}
262
263		if len(errs) == 0 {
264			continue
265		}
266		treeError := TreeError{ID: job.ID, Errors: errs}
267		select {
268		case <-ctx.Done():
269			return
270		case out <- treeError:
271			debug.Log("tree %v: sent %d errors", treeError.ID, len(treeError.Errors))
272		}
273	}
274}
275
276func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (ids restic.IDs, errs []error) {
277	err := restic.ForAllSnapshots(ctx, repo, nil, func(id restic.ID, sn *restic.Snapshot, err error) error {
278		if err != nil {
279			errs = append(errs, err)
280			return nil
281		}
282		treeID := *sn.Tree
283		debug.Log("snapshot %v has tree %v", id, treeID)
284		ids = append(ids, treeID)
285		return nil
286	})
287	if err != nil {
288		errs = append(errs, err)
289	}
290
291	return ids, errs
292}
293
294// Structure checks that for all snapshots all referenced data blobs and
295// subtrees are available in the index. errChan is closed after all trees have
296// been traversed.
297func (c *Checker) Structure(ctx context.Context, p *progress.Counter, errChan chan<- error) {
298	trees, errs := loadSnapshotTreeIDs(ctx, c.repo)
299	p.SetMax(uint64(len(trees)))
300	debug.Log("need to check %d trees from snapshots, %d errs returned", len(trees), len(errs))
301
302	for _, err := range errs {
303		select {
304		case <-ctx.Done():
305			return
306		case errChan <- err:
307		}
308	}
309
310	wg, ctx := errgroup.WithContext(ctx)
311	treeStream := restic.StreamTrees(ctx, wg, c.repo, trees, func(treeID restic.ID) bool {
312		// blobRefs may be accessed in parallel by checkTree
313		c.blobRefs.Lock()
314		h := restic.BlobHandle{ID: treeID, Type: restic.TreeBlob}
315		blobReferenced := c.blobRefs.M.Has(h)
316		// noop if already referenced
317		c.blobRefs.M.Insert(h)
318		c.blobRefs.Unlock()
319		return blobReferenced
320	}, p)
321
322	defer close(errChan)
323	for i := 0; i < defaultParallelism; i++ {
324		wg.Go(func() error {
325			c.checkTreeWorker(ctx, treeStream, errChan)
326			return nil
327		})
328	}
329
330	// the wait group should not return an error because no worker returns an
331	// error, so panic if that has changed somehow.
332	err := wg.Wait()
333	if err != nil {
334		panic(err)
335	}
336}
337
338func (c *Checker) checkTree(id restic.ID, tree *restic.Tree) (errs []error) {
339	debug.Log("checking tree %v", id)
340
341	for _, node := range tree.Nodes {
342		switch node.Type {
343		case "file":
344			if node.Content == nil {
345				errs = append(errs, Error{TreeID: id, Err: errors.Errorf("file %q has nil blob list", node.Name)})
346			}
347
348			for b, blobID := range node.Content {
349				if blobID.IsNull() {
350					errs = append(errs, Error{TreeID: id, Err: errors.Errorf("file %q blob %d has null ID", node.Name, b)})
351					continue
352				}
353				// Note that we do not use the blob size. The "obvious" check
354				// whether the sum of the blob sizes matches the file size
355				// unfortunately fails in some cases that are not resolveable
356				// by users, so we omit this check, see #1887
357
358				_, found := c.repo.LookupBlobSize(blobID, restic.DataBlob)
359				if !found {
360					debug.Log("tree %v references blob %v which isn't contained in index", id, blobID)
361					errs = append(errs, Error{TreeID: id, Err: errors.Errorf("file %q blob %v not found in index", node.Name, blobID)})
362				}
363			}
364
365			if c.trackUnused {
366				// loop a second time to keep the locked section as short as possible
367				c.blobRefs.Lock()
368				for _, blobID := range node.Content {
369					if blobID.IsNull() {
370						continue
371					}
372					h := restic.BlobHandle{ID: blobID, Type: restic.DataBlob}
373					c.blobRefs.M.Insert(h)
374					debug.Log("blob %v is referenced", blobID)
375				}
376				c.blobRefs.Unlock()
377			}
378
379		case "dir":
380			if node.Subtree == nil {
381				errs = append(errs, Error{TreeID: id, Err: errors.Errorf("dir node %q has no subtree", node.Name)})
382				continue
383			}
384
385			if node.Subtree.IsNull() {
386				errs = append(errs, Error{TreeID: id, Err: errors.Errorf("dir node %q subtree id is null", node.Name)})
387				continue
388			}
389
390		case "symlink", "socket", "chardev", "dev", "fifo":
391			// nothing to check
392
393		default:
394			errs = append(errs, Error{TreeID: id, Err: errors.Errorf("node %q with invalid type %q", node.Name, node.Type)})
395		}
396
397		if node.Name == "" {
398			errs = append(errs, Error{TreeID: id, Err: errors.New("node with empty name")})
399		}
400	}
401
402	return errs
403}
404
405// UnusedBlobs returns all blobs that have never been referenced.
406func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) {
407	if !c.trackUnused {
408		panic("only works when tracking blob references")
409	}
410	c.blobRefs.Lock()
411	defer c.blobRefs.Unlock()
412
413	debug.Log("checking %d blobs", len(c.blobRefs.M))
414	ctx, cancel := context.WithCancel(ctx)
415	defer cancel()
416
417	for blob := range c.repo.Index().Each(ctx) {
418		h := restic.BlobHandle{ID: blob.ID, Type: blob.Type}
419		if !c.blobRefs.M.Has(h) {
420			debug.Log("blob %v not referenced", h)
421			blobs = append(blobs, h)
422		}
423	}
424
425	return blobs
426}
427
428// CountPacks returns the number of packs in the repository.
429func (c *Checker) CountPacks() uint64 {
430	return uint64(len(c.packs))
431}
432
433// GetPacks returns IDSet of packs in the repository
434func (c *Checker) GetPacks() map[restic.ID]int64 {
435	return c.packs
436}
437
438// checkPack reads a pack and checks the integrity of all blobs.
439func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int64) error {
440	debug.Log("checking pack %v", id)
441	h := restic.Handle{Type: restic.PackFile, Name: id.String()}
442
443	packfile, hash, realSize, err := repository.DownloadAndHash(ctx, r.Backend(), h)
444	if err != nil {
445		return errors.Wrap(err, "checkPack")
446	}
447
448	defer func() {
449		_ = packfile.Close()
450		_ = os.Remove(packfile.Name())
451	}()
452
453	debug.Log("hash for pack %v is %v", id, hash)
454
455	if !hash.Equal(id) {
456		debug.Log("Pack ID does not match, want %v, got %v", id, hash)
457		return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str())
458	}
459
460	if realSize != size {
461		debug.Log("Pack size does not match, want %v, got %v", size, realSize)
462		return errors.Errorf("Pack size does not match, want %v, got %v", size, realSize)
463	}
464
465	blobs, hdrSize, err := pack.List(r.Key(), packfile, size)
466	if err != nil {
467		return err
468	}
469
470	var errs []error
471	var buf []byte
472	sizeFromBlobs := uint(hdrSize)
473	idx := r.Index()
474	for i, blob := range blobs {
475		sizeFromBlobs += blob.Length
476		debug.Log("  check blob %d: %v", i, blob)
477
478		buf = buf[:cap(buf)]
479		if uint(len(buf)) < blob.Length {
480			buf = make([]byte, blob.Length)
481		}
482		buf = buf[:blob.Length]
483
484		_, err := packfile.Seek(int64(blob.Offset), 0)
485		if err != nil {
486			return errors.Errorf("Seek(%v): %v", blob.Offset, err)
487		}
488
489		_, err = io.ReadFull(packfile, buf)
490		if err != nil {
491			debug.Log("  error loading blob %v: %v", blob.ID, err)
492			errs = append(errs, errors.Errorf("blob %v: %v", i, err))
493			continue
494		}
495
496		nonce, ciphertext := buf[:r.Key().NonceSize()], buf[r.Key().NonceSize():]
497		plaintext, err := r.Key().Open(ciphertext[:0], nonce, ciphertext, nil)
498		if err != nil {
499			debug.Log("  error decrypting blob %v: %v", blob.ID, err)
500			errs = append(errs, errors.Errorf("blob %v: %v", i, err))
501			continue
502		}
503
504		hash := restic.Hash(plaintext)
505		if !hash.Equal(blob.ID) {
506			debug.Log("  Blob ID does not match, want %v, got %v", blob.ID, hash)
507			errs = append(errs, errors.Errorf("Blob ID does not match, want %v, got %v", blob.ID.Str(), hash.Str()))
508			continue
509		}
510
511		// Check if blob is contained in index and position is correct
512		idxHas := false
513		for _, pb := range idx.Lookup(blob.BlobHandle) {
514			if pb.PackID == id && pb.Offset == blob.Offset && pb.Length == blob.Length {
515				idxHas = true
516				break
517			}
518		}
519		if !idxHas {
520			errs = append(errs, errors.Errorf("Blob %v is not contained in index or position is incorrect", blob.ID.Str()))
521			continue
522		}
523	}
524
525	if int64(sizeFromBlobs) != size {
526		debug.Log("Pack size does not match, want %v, got %v", size, sizeFromBlobs)
527		errs = append(errs, errors.Errorf("Pack size does not match, want %v, got %v", size, sizeFromBlobs))
528	}
529
530	if len(errs) > 0 {
531		return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs)
532	}
533
534	return nil
535}
536
537// ReadData loads all data from the repository and checks the integrity.
538func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
539	c.ReadPacks(ctx, c.packs, nil, errChan)
540}
541
542// ReadPacks loads data from specified packs and checks the integrity.
543func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
544	defer close(errChan)
545
546	g, ctx := errgroup.WithContext(ctx)
547	type packsize struct {
548		id   restic.ID
549		size int64
550	}
551	ch := make(chan packsize)
552
553	// run workers
554	for i := 0; i < defaultParallelism; i++ {
555		g.Go(func() error {
556			for {
557				var ps packsize
558				var ok bool
559
560				select {
561				case <-ctx.Done():
562					return nil
563				case ps, ok = <-ch:
564					if !ok {
565						return nil
566					}
567				}
568				err := checkPack(ctx, c.repo, ps.id, ps.size)
569				p.Add(1)
570				if err == nil {
571					continue
572				}
573
574				select {
575				case <-ctx.Done():
576					return nil
577				case errChan <- err:
578				}
579			}
580		})
581	}
582
583	// push packs to ch
584	for pack, size := range packs {
585		select {
586		case ch <- packsize{id: pack, size: size}:
587		case <-ctx.Done():
588		}
589	}
590	close(ch)
591
592	err := g.Wait()
593	if err != nil {
594		select {
595		case <-ctx.Done():
596			return
597		case errChan <- err:
598		}
599	}
600}
601