1package fscache // import "github.com/docker/docker/builder/fscache"
2
3import (
4	"archive/tar"
5	"context"
6	"crypto/sha256"
7	"encoding/json"
8	"hash"
9	"os"
10	"path/filepath"
11	"sort"
12	"sync"
13	"time"
14
15	"github.com/docker/docker/builder"
16	"github.com/docker/docker/builder/remotecontext"
17	"github.com/docker/docker/pkg/archive"
18	"github.com/docker/docker/pkg/directory"
19	"github.com/docker/docker/pkg/stringid"
20	"github.com/docker/docker/pkg/tarsum"
21	"github.com/moby/buildkit/session/filesync"
22	"github.com/pkg/errors"
23	"github.com/sirupsen/logrus"
24	"github.com/tonistiigi/fsutil"
25	fsutiltypes "github.com/tonistiigi/fsutil/types"
26	bolt "go.etcd.io/bbolt"
27	"golang.org/x/sync/singleflight"
28)
29
30const dbFile = "fscache.db"
31const cacheKey = "cache"
32const metaKey = "meta"
33
34// Backend is a backing implementation for FSCache
35type Backend interface {
36	Get(id string) (string, error)
37	Remove(id string) error
38}
39
40// FSCache allows syncing remote resources to cached snapshots
41type FSCache struct {
42	opt        Opt
43	transports map[string]Transport
44	mu         sync.Mutex
45	g          singleflight.Group
46	store      *fsCacheStore
47}
48
49// Opt defines options for initializing FSCache
50type Opt struct {
51	Backend  Backend
52	Root     string // for storing local metadata
53	GCPolicy GCPolicy
54}
55
56// GCPolicy defines policy for garbage collection
57type GCPolicy struct {
58	MaxSize         uint64
59	MaxKeepDuration time.Duration
60}
61
62// NewFSCache returns new FSCache object
63func NewFSCache(opt Opt) (*FSCache, error) {
64	store, err := newFSCacheStore(opt)
65	if err != nil {
66		return nil, err
67	}
68	return &FSCache{
69		store:      store,
70		opt:        opt,
71		transports: make(map[string]Transport),
72	}, nil
73}
74
75// Transport defines a method for syncing remote data to FSCache
76type Transport interface {
77	Copy(ctx context.Context, id RemoteIdentifier, dest string, cs filesync.CacheUpdater) error
78}
79
80// RemoteIdentifier identifies a transfer request
81type RemoteIdentifier interface {
82	Key() string
83	SharedKey() string
84	Transport() string
85}
86
87// RegisterTransport registers a new transport method
88func (fsc *FSCache) RegisterTransport(id string, transport Transport) error {
89	fsc.mu.Lock()
90	defer fsc.mu.Unlock()
91	if _, ok := fsc.transports[id]; ok {
92		return errors.Errorf("transport %v already exists", id)
93	}
94	fsc.transports[id] = transport
95	return nil
96}
97
98// SyncFrom returns a source based on a remote identifier
99func (fsc *FSCache) SyncFrom(ctx context.Context, id RemoteIdentifier) (builder.Source, error) { // cacheOpt
100	trasportID := id.Transport()
101	fsc.mu.Lock()
102	transport, ok := fsc.transports[id.Transport()]
103	if !ok {
104		fsc.mu.Unlock()
105		return nil, errors.Errorf("invalid transport %s", trasportID)
106	}
107
108	logrus.Debugf("SyncFrom %s %s", id.Key(), id.SharedKey())
109	fsc.mu.Unlock()
110	sourceRef, err, _ := fsc.g.Do(id.Key(), func() (interface{}, error) {
111		var sourceRef *cachedSourceRef
112		sourceRef, err := fsc.store.Get(id.Key())
113		if err == nil {
114			return sourceRef, nil
115		}
116
117		// check for unused shared cache
118		sharedKey := id.SharedKey()
119		if sharedKey != "" {
120			r, err := fsc.store.Rebase(sharedKey, id.Key())
121			if err == nil {
122				sourceRef = r
123			}
124		}
125
126		if sourceRef == nil {
127			var err error
128			sourceRef, err = fsc.store.New(id.Key(), sharedKey)
129			if err != nil {
130				return nil, errors.Wrap(err, "failed to create remote context")
131			}
132		}
133
134		if err := syncFrom(ctx, sourceRef, transport, id); err != nil {
135			sourceRef.Release()
136			return nil, err
137		}
138		if err := sourceRef.resetSize(-1); err != nil {
139			return nil, err
140		}
141		return sourceRef, nil
142	})
143	if err != nil {
144		return nil, err
145	}
146	ref := sourceRef.(*cachedSourceRef)
147	if ref.src == nil { // failsafe
148		return nil, errors.Errorf("invalid empty pull")
149	}
150	wc := &wrappedContext{Source: ref.src, closer: func() error {
151		ref.Release()
152		return nil
153	}}
154	return wc, nil
155}
156
157// DiskUsage reports how much data is allocated by the cache
158func (fsc *FSCache) DiskUsage(ctx context.Context) (int64, error) {
159	return fsc.store.DiskUsage(ctx)
160}
161
162// Prune allows manually cleaning up the cache
163func (fsc *FSCache) Prune(ctx context.Context) (uint64, error) {
164	return fsc.store.Prune(ctx)
165}
166
167// Close stops the gc and closes the persistent db
168func (fsc *FSCache) Close() error {
169	return fsc.store.Close()
170}
171
172func syncFrom(ctx context.Context, cs *cachedSourceRef, transport Transport, id RemoteIdentifier) (retErr error) {
173	src := cs.src
174	if src == nil {
175		src = remotecontext.NewCachableSource(cs.Dir())
176	}
177
178	if !cs.cached {
179		if err := cs.storage.db.View(func(tx *bolt.Tx) error {
180			b := tx.Bucket([]byte(id.Key()))
181			dt := b.Get([]byte(cacheKey))
182			if dt != nil {
183				if err := src.UnmarshalBinary(dt); err != nil {
184					return err
185				}
186			} else {
187				return errors.Wrap(src.Scan(), "failed to scan cache records")
188			}
189			return nil
190		}); err != nil {
191			return err
192		}
193	}
194
195	dc := &detectChanges{f: src.HandleChange}
196
197	// todo: probably send a bucket to `Copy` and let it return source
198	// but need to make sure that tx is safe
199	if err := transport.Copy(ctx, id, cs.Dir(), dc); err != nil {
200		return errors.Wrapf(err, "failed to copy to %s", cs.Dir())
201	}
202
203	if !dc.supported {
204		if err := src.Scan(); err != nil {
205			return errors.Wrap(err, "failed to scan cache records after transfer")
206		}
207	}
208	cs.cached = true
209	cs.src = src
210	return cs.storage.db.Update(func(tx *bolt.Tx) error {
211		dt, err := src.MarshalBinary()
212		if err != nil {
213			return err
214		}
215		b := tx.Bucket([]byte(id.Key()))
216		return b.Put([]byte(cacheKey), dt)
217	})
218}
219
220type fsCacheStore struct {
221	mu       sync.Mutex
222	sources  map[string]*cachedSource
223	db       *bolt.DB
224	fs       Backend
225	gcTimer  *time.Timer
226	gcPolicy GCPolicy
227}
228
229// CachePolicy defines policy for keeping a resource in cache
230type CachePolicy struct {
231	Priority int
232	LastUsed time.Time
233}
234
235func defaultCachePolicy() CachePolicy {
236	return CachePolicy{Priority: 10, LastUsed: time.Now()}
237}
238
239func newFSCacheStore(opt Opt) (*fsCacheStore, error) {
240	if err := os.MkdirAll(opt.Root, 0700); err != nil {
241		return nil, err
242	}
243	p := filepath.Join(opt.Root, dbFile)
244	db, err := bolt.Open(p, 0600, nil)
245	if err != nil {
246		return nil, errors.Wrap(err, "failed to open database file %s")
247	}
248	s := &fsCacheStore{db: db, sources: make(map[string]*cachedSource), fs: opt.Backend, gcPolicy: opt.GCPolicy}
249	db.View(func(tx *bolt.Tx) error {
250		return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
251			dt := b.Get([]byte(metaKey))
252			if dt == nil {
253				return nil
254			}
255			var sm sourceMeta
256			if err := json.Unmarshal(dt, &sm); err != nil {
257				return err
258			}
259			dir, err := s.fs.Get(sm.BackendID)
260			if err != nil {
261				return err // TODO: handle gracefully
262			}
263			source := &cachedSource{
264				refs:       make(map[*cachedSourceRef]struct{}),
265				id:         string(name),
266				dir:        dir,
267				sourceMeta: sm,
268				storage:    s,
269			}
270			s.sources[string(name)] = source
271			return nil
272		})
273	})
274
275	s.gcTimer = s.startPeriodicGC(5 * time.Minute)
276	return s, nil
277}
278
279func (s *fsCacheStore) startPeriodicGC(interval time.Duration) *time.Timer {
280	var t *time.Timer
281	t = time.AfterFunc(interval, func() {
282		if err := s.GC(); err != nil {
283			logrus.Errorf("build gc error: %v", err)
284		}
285		t.Reset(interval)
286	})
287	return t
288}
289
290func (s *fsCacheStore) Close() error {
291	s.gcTimer.Stop()
292	return s.db.Close()
293}
294
295func (s *fsCacheStore) New(id, sharedKey string) (*cachedSourceRef, error) {
296	s.mu.Lock()
297	defer s.mu.Unlock()
298	var ret *cachedSource
299	if err := s.db.Update(func(tx *bolt.Tx) error {
300		b, err := tx.CreateBucket([]byte(id))
301		if err != nil {
302			return err
303		}
304		backendID := stringid.GenerateRandomID()
305		dir, err := s.fs.Get(backendID)
306		if err != nil {
307			return err
308		}
309		source := &cachedSource{
310			refs: make(map[*cachedSourceRef]struct{}),
311			id:   id,
312			dir:  dir,
313			sourceMeta: sourceMeta{
314				BackendID:   backendID,
315				SharedKey:   sharedKey,
316				CachePolicy: defaultCachePolicy(),
317			},
318			storage: s,
319		}
320		dt, err := json.Marshal(source.sourceMeta)
321		if err != nil {
322			return err
323		}
324		if err := b.Put([]byte(metaKey), dt); err != nil {
325			return err
326		}
327		s.sources[id] = source
328		ret = source
329		return nil
330	}); err != nil {
331		return nil, err
332	}
333	return ret.getRef(), nil
334}
335
336func (s *fsCacheStore) Rebase(sharedKey, newid string) (*cachedSourceRef, error) {
337	s.mu.Lock()
338	defer s.mu.Unlock()
339	var ret *cachedSource
340	for id, snap := range s.sources {
341		if snap.SharedKey == sharedKey && len(snap.refs) == 0 {
342			if err := s.db.Update(func(tx *bolt.Tx) error {
343				if err := tx.DeleteBucket([]byte(id)); err != nil {
344					return err
345				}
346				b, err := tx.CreateBucket([]byte(newid))
347				if err != nil {
348					return err
349				}
350				snap.id = newid
351				snap.CachePolicy = defaultCachePolicy()
352				dt, err := json.Marshal(snap.sourceMeta)
353				if err != nil {
354					return err
355				}
356				if err := b.Put([]byte(metaKey), dt); err != nil {
357					return err
358				}
359				delete(s.sources, id)
360				s.sources[newid] = snap
361				return nil
362			}); err != nil {
363				return nil, err
364			}
365			ret = snap
366			break
367		}
368	}
369	if ret == nil {
370		return nil, errors.Errorf("no candidate for rebase")
371	}
372	return ret.getRef(), nil
373}
374
375func (s *fsCacheStore) Get(id string) (*cachedSourceRef, error) {
376	s.mu.Lock()
377	defer s.mu.Unlock()
378	src, ok := s.sources[id]
379	if !ok {
380		return nil, errors.Errorf("not found")
381	}
382	return src.getRef(), nil
383}
384
385// DiskUsage reports how much data is allocated by the cache
386func (s *fsCacheStore) DiskUsage(ctx context.Context) (int64, error) {
387	s.mu.Lock()
388	defer s.mu.Unlock()
389	var size int64
390
391	for _, snap := range s.sources {
392		if len(snap.refs) == 0 {
393			ss, err := snap.getSize(ctx)
394			if err != nil {
395				return 0, err
396			}
397			size += ss
398		}
399	}
400	return size, nil
401}
402
403// Prune allows manually cleaning up the cache
404func (s *fsCacheStore) Prune(ctx context.Context) (uint64, error) {
405	s.mu.Lock()
406	defer s.mu.Unlock()
407	var size uint64
408
409	for id, snap := range s.sources {
410		select {
411		case <-ctx.Done():
412			logrus.Debugf("Cache prune operation cancelled, pruned size: %d", size)
413			// when the context is cancelled, only return current size and nil
414			return size, nil
415		default:
416		}
417		if len(snap.refs) == 0 {
418			ss, err := snap.getSize(ctx)
419			if err != nil {
420				return size, err
421			}
422			if err := s.delete(id); err != nil {
423				return size, errors.Wrapf(err, "failed to delete %s", id)
424			}
425			size += uint64(ss)
426		}
427	}
428	return size, nil
429}
430
431// GC runs a garbage collector on FSCache
432func (s *fsCacheStore) GC() error {
433	s.mu.Lock()
434	defer s.mu.Unlock()
435	var size uint64
436
437	ctx := context.Background()
438	cutoff := time.Now().Add(-s.gcPolicy.MaxKeepDuration)
439	var blacklist []*cachedSource
440
441	for id, snap := range s.sources {
442		if len(snap.refs) == 0 {
443			if cutoff.After(snap.CachePolicy.LastUsed) {
444				if err := s.delete(id); err != nil {
445					return errors.Wrapf(err, "failed to delete %s", id)
446				}
447			} else {
448				ss, err := snap.getSize(ctx)
449				if err != nil {
450					return err
451				}
452				size += uint64(ss)
453				blacklist = append(blacklist, snap)
454			}
455		}
456	}
457
458	sort.Sort(sortableCacheSources(blacklist))
459	for _, snap := range blacklist {
460		if size <= s.gcPolicy.MaxSize {
461			break
462		}
463		ss, err := snap.getSize(ctx)
464		if err != nil {
465			return err
466		}
467		if err := s.delete(snap.id); err != nil {
468			return errors.Wrapf(err, "failed to delete %s", snap.id)
469		}
470		size -= uint64(ss)
471	}
472	return nil
473}
474
475// keep mu while calling this
476func (s *fsCacheStore) delete(id string) error {
477	src, ok := s.sources[id]
478	if !ok {
479		return nil
480	}
481	if len(src.refs) > 0 {
482		return errors.Errorf("can't delete %s because it has active references", id)
483	}
484	delete(s.sources, id)
485	if err := s.db.Update(func(tx *bolt.Tx) error {
486		return tx.DeleteBucket([]byte(id))
487	}); err != nil {
488		return err
489	}
490	return s.fs.Remove(src.BackendID)
491}
492
493type sourceMeta struct {
494	SharedKey   string
495	BackendID   string
496	CachePolicy CachePolicy
497	Size        int64
498}
499
500type cachedSource struct {
501	sourceMeta
502	refs    map[*cachedSourceRef]struct{}
503	id      string
504	dir     string
505	src     *remotecontext.CachableSource
506	storage *fsCacheStore
507	cached  bool // keep track if cache is up to date
508}
509
510type cachedSourceRef struct {
511	*cachedSource
512}
513
514func (cs *cachedSource) Dir() string {
515	return cs.dir
516}
517
518// hold storage lock before calling
519func (cs *cachedSource) getRef() *cachedSourceRef {
520	ref := &cachedSourceRef{cachedSource: cs}
521	cs.refs[ref] = struct{}{}
522	return ref
523}
524
525// hold storage lock before calling
526func (cs *cachedSource) getSize(ctx context.Context) (int64, error) {
527	if cs.sourceMeta.Size < 0 {
528		ss, err := directory.Size(ctx, cs.dir)
529		if err != nil {
530			return 0, err
531		}
532		if err := cs.resetSize(ss); err != nil {
533			return 0, err
534		}
535		return ss, nil
536	}
537	return cs.sourceMeta.Size, nil
538}
539
540func (cs *cachedSource) resetSize(val int64) error {
541	cs.sourceMeta.Size = val
542	return cs.saveMeta()
543}
544func (cs *cachedSource) saveMeta() error {
545	return cs.storage.db.Update(func(tx *bolt.Tx) error {
546		b := tx.Bucket([]byte(cs.id))
547		dt, err := json.Marshal(cs.sourceMeta)
548		if err != nil {
549			return err
550		}
551		return b.Put([]byte(metaKey), dt)
552	})
553}
554
555func (csr *cachedSourceRef) Release() error {
556	csr.cachedSource.storage.mu.Lock()
557	defer csr.cachedSource.storage.mu.Unlock()
558	delete(csr.cachedSource.refs, csr)
559	if len(csr.cachedSource.refs) == 0 {
560		go csr.cachedSource.storage.GC()
561	}
562	return nil
563}
564
565type detectChanges struct {
566	f         fsutil.ChangeFunc
567	supported bool
568}
569
570func (dc *detectChanges) HandleChange(kind fsutil.ChangeKind, path string, fi os.FileInfo, err error) error {
571	if dc == nil {
572		return nil
573	}
574	return dc.f(kind, path, fi, err)
575}
576
577func (dc *detectChanges) MarkSupported(v bool) {
578	if dc == nil {
579		return
580	}
581	dc.supported = v
582}
583
584func (dc *detectChanges) ContentHasher() fsutil.ContentHasher {
585	return newTarsumHash
586}
587
588type wrappedContext struct {
589	builder.Source
590	closer func() error
591}
592
593func (wc *wrappedContext) Close() error {
594	if err := wc.Source.Close(); err != nil {
595		return err
596	}
597	return wc.closer()
598}
599
600type sortableCacheSources []*cachedSource
601
602// Len is the number of elements in the collection.
603func (s sortableCacheSources) Len() int {
604	return len(s)
605}
606
607// Less reports whether the element with
608// index i should sort before the element with index j.
609func (s sortableCacheSources) Less(i, j int) bool {
610	return s[i].CachePolicy.LastUsed.Before(s[j].CachePolicy.LastUsed)
611}
612
613// Swap swaps the elements with indexes i and j.
614func (s sortableCacheSources) Swap(i, j int) {
615	s[i], s[j] = s[j], s[i]
616}
617
618func newTarsumHash(stat *fsutiltypes.Stat) (hash.Hash, error) {
619	fi := &fsutil.StatInfo{Stat: stat}
620	p := stat.Path
621	if fi.IsDir() {
622		p += string(os.PathSeparator)
623	}
624	h, err := archive.FileInfoHeader(p, fi, stat.Linkname)
625	if err != nil {
626		return nil, err
627	}
628	h.Name = p
629	h.Uid = int(stat.Uid)
630	h.Gid = int(stat.Gid)
631	h.Linkname = stat.Linkname
632	if stat.Xattrs != nil {
633		h.Xattrs = make(map[string]string)
634		for k, v := range stat.Xattrs {
635			h.Xattrs[k] = string(v)
636		}
637	}
638
639	tsh := &tarsumHash{h: h, Hash: sha256.New()}
640	tsh.Reset()
641	return tsh, nil
642}
643
644// Reset resets the Hash to its initial state.
645func (tsh *tarsumHash) Reset() {
646	tsh.Hash.Reset()
647	tarsum.WriteV1Header(tsh.h, tsh.Hash)
648}
649
650type tarsumHash struct {
651	hash.Hash
652	h *tar.Header
653}
654