1package fscache // import "github.com/docker/docker/builder/fscache"
2
3import (
4	"archive/tar"
5	"context"
6	"crypto/sha256"
7	"encoding/json"
8	"hash"
9	"os"
10	"path/filepath"
11	"sort"
12	"sync"
13	"time"
14
15	"github.com/boltdb/bolt"
16	"github.com/docker/docker/builder"
17	"github.com/docker/docker/builder/remotecontext"
18	"github.com/docker/docker/pkg/archive"
19	"github.com/docker/docker/pkg/directory"
20	"github.com/docker/docker/pkg/stringid"
21	"github.com/docker/docker/pkg/tarsum"
22	"github.com/moby/buildkit/session/filesync"
23	"github.com/pkg/errors"
24	"github.com/sirupsen/logrus"
25	"github.com/tonistiigi/fsutil"
26	"golang.org/x/sync/singleflight"
27)
28
29const dbFile = "fscache.db"
30const cacheKey = "cache"
31const metaKey = "meta"
32
33// Backend is a backing implementation for FSCache
34type Backend interface {
35	Get(id string) (string, error)
36	Remove(id string) error
37}
38
39// FSCache allows syncing remote resources to cached snapshots
40type FSCache struct {
41	opt        Opt
42	transports map[string]Transport
43	mu         sync.Mutex
44	g          singleflight.Group
45	store      *fsCacheStore
46}
47
48// Opt defines options for initializing FSCache
49type Opt struct {
50	Backend  Backend
51	Root     string // for storing local metadata
52	GCPolicy GCPolicy
53}
54
55// GCPolicy defines policy for garbage collection
56type GCPolicy struct {
57	MaxSize         uint64
58	MaxKeepDuration time.Duration
59}
60
61// NewFSCache returns new FSCache object
62func NewFSCache(opt Opt) (*FSCache, error) {
63	store, err := newFSCacheStore(opt)
64	if err != nil {
65		return nil, err
66	}
67	return &FSCache{
68		store:      store,
69		opt:        opt,
70		transports: make(map[string]Transport),
71	}, nil
72}
73
74// Transport defines a method for syncing remote data to FSCache
75type Transport interface {
76	Copy(ctx context.Context, id RemoteIdentifier, dest string, cs filesync.CacheUpdater) error
77}
78
79// RemoteIdentifier identifies a transfer request
80type RemoteIdentifier interface {
81	Key() string
82	SharedKey() string
83	Transport() string
84}
85
86// RegisterTransport registers a new transport method
87func (fsc *FSCache) RegisterTransport(id string, transport Transport) error {
88	fsc.mu.Lock()
89	defer fsc.mu.Unlock()
90	if _, ok := fsc.transports[id]; ok {
91		return errors.Errorf("transport %v already exists", id)
92	}
93	fsc.transports[id] = transport
94	return nil
95}
96
97// SyncFrom returns a source based on a remote identifier
98func (fsc *FSCache) SyncFrom(ctx context.Context, id RemoteIdentifier) (builder.Source, error) { // cacheOpt
99	trasportID := id.Transport()
100	fsc.mu.Lock()
101	transport, ok := fsc.transports[id.Transport()]
102	if !ok {
103		fsc.mu.Unlock()
104		return nil, errors.Errorf("invalid transport %s", trasportID)
105	}
106
107	logrus.Debugf("SyncFrom %s %s", id.Key(), id.SharedKey())
108	fsc.mu.Unlock()
109	sourceRef, err, _ := fsc.g.Do(id.Key(), func() (interface{}, error) {
110		var sourceRef *cachedSourceRef
111		sourceRef, err := fsc.store.Get(id.Key())
112		if err == nil {
113			return sourceRef, nil
114		}
115
116		// check for unused shared cache
117		sharedKey := id.SharedKey()
118		if sharedKey != "" {
119			r, err := fsc.store.Rebase(sharedKey, id.Key())
120			if err == nil {
121				sourceRef = r
122			}
123		}
124
125		if sourceRef == nil {
126			var err error
127			sourceRef, err = fsc.store.New(id.Key(), sharedKey)
128			if err != nil {
129				return nil, errors.Wrap(err, "failed to create remote context")
130			}
131		}
132
133		if err := syncFrom(ctx, sourceRef, transport, id); err != nil {
134			sourceRef.Release()
135			return nil, err
136		}
137		if err := sourceRef.resetSize(-1); err != nil {
138			return nil, err
139		}
140		return sourceRef, nil
141	})
142	if err != nil {
143		return nil, err
144	}
145	ref := sourceRef.(*cachedSourceRef)
146	if ref.src == nil { // failsafe
147		return nil, errors.Errorf("invalid empty pull")
148	}
149	wc := &wrappedContext{Source: ref.src, closer: func() error {
150		ref.Release()
151		return nil
152	}}
153	return wc, nil
154}
155
156// DiskUsage reports how much data is allocated by the cache
157func (fsc *FSCache) DiskUsage(ctx context.Context) (int64, error) {
158	return fsc.store.DiskUsage(ctx)
159}
160
161// Prune allows manually cleaning up the cache
162func (fsc *FSCache) Prune(ctx context.Context) (uint64, error) {
163	return fsc.store.Prune(ctx)
164}
165
166// Close stops the gc and closes the persistent db
167func (fsc *FSCache) Close() error {
168	return fsc.store.Close()
169}
170
171func syncFrom(ctx context.Context, cs *cachedSourceRef, transport Transport, id RemoteIdentifier) (retErr error) {
172	src := cs.src
173	if src == nil {
174		src = remotecontext.NewCachableSource(cs.Dir())
175	}
176
177	if !cs.cached {
178		if err := cs.storage.db.View(func(tx *bolt.Tx) error {
179			b := tx.Bucket([]byte(id.Key()))
180			dt := b.Get([]byte(cacheKey))
181			if dt != nil {
182				if err := src.UnmarshalBinary(dt); err != nil {
183					return err
184				}
185			} else {
186				return errors.Wrap(src.Scan(), "failed to scan cache records")
187			}
188			return nil
189		}); err != nil {
190			return err
191		}
192	}
193
194	dc := &detectChanges{f: src.HandleChange}
195
196	// todo: probably send a bucket to `Copy` and let it return source
197	// but need to make sure that tx is safe
198	if err := transport.Copy(ctx, id, cs.Dir(), dc); err != nil {
199		return errors.Wrapf(err, "failed to copy to %s", cs.Dir())
200	}
201
202	if !dc.supported {
203		if err := src.Scan(); err != nil {
204			return errors.Wrap(err, "failed to scan cache records after transfer")
205		}
206	}
207	cs.cached = true
208	cs.src = src
209	return cs.storage.db.Update(func(tx *bolt.Tx) error {
210		dt, err := src.MarshalBinary()
211		if err != nil {
212			return err
213		}
214		b := tx.Bucket([]byte(id.Key()))
215		return b.Put([]byte(cacheKey), dt)
216	})
217}
218
219type fsCacheStore struct {
220	mu       sync.Mutex
221	sources  map[string]*cachedSource
222	db       *bolt.DB
223	fs       Backend
224	gcTimer  *time.Timer
225	gcPolicy GCPolicy
226}
227
228// CachePolicy defines policy for keeping a resource in cache
229type CachePolicy struct {
230	Priority int
231	LastUsed time.Time
232}
233
234func defaultCachePolicy() CachePolicy {
235	return CachePolicy{Priority: 10, LastUsed: time.Now()}
236}
237
238func newFSCacheStore(opt Opt) (*fsCacheStore, error) {
239	if err := os.MkdirAll(opt.Root, 0700); err != nil {
240		return nil, err
241	}
242	p := filepath.Join(opt.Root, dbFile)
243	db, err := bolt.Open(p, 0600, nil)
244	if err != nil {
245		return nil, errors.Wrap(err, "failed to open database file %s")
246	}
247	s := &fsCacheStore{db: db, sources: make(map[string]*cachedSource), fs: opt.Backend, gcPolicy: opt.GCPolicy}
248	db.View(func(tx *bolt.Tx) error {
249		return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
250			dt := b.Get([]byte(metaKey))
251			if dt == nil {
252				return nil
253			}
254			var sm sourceMeta
255			if err := json.Unmarshal(dt, &sm); err != nil {
256				return err
257			}
258			dir, err := s.fs.Get(sm.BackendID)
259			if err != nil {
260				return err // TODO: handle gracefully
261			}
262			source := &cachedSource{
263				refs:       make(map[*cachedSourceRef]struct{}),
264				id:         string(name),
265				dir:        dir,
266				sourceMeta: sm,
267				storage:    s,
268			}
269			s.sources[string(name)] = source
270			return nil
271		})
272	})
273
274	s.gcTimer = s.startPeriodicGC(5 * time.Minute)
275	return s, nil
276}
277
278func (s *fsCacheStore) startPeriodicGC(interval time.Duration) *time.Timer {
279	var t *time.Timer
280	t = time.AfterFunc(interval, func() {
281		if err := s.GC(); err != nil {
282			logrus.Errorf("build gc error: %v", err)
283		}
284		t.Reset(interval)
285	})
286	return t
287}
288
289func (s *fsCacheStore) Close() error {
290	s.gcTimer.Stop()
291	return s.db.Close()
292}
293
294func (s *fsCacheStore) New(id, sharedKey string) (*cachedSourceRef, error) {
295	s.mu.Lock()
296	defer s.mu.Unlock()
297	var ret *cachedSource
298	if err := s.db.Update(func(tx *bolt.Tx) error {
299		b, err := tx.CreateBucket([]byte(id))
300		if err != nil {
301			return err
302		}
303		backendID := stringid.GenerateRandomID()
304		dir, err := s.fs.Get(backendID)
305		if err != nil {
306			return err
307		}
308		source := &cachedSource{
309			refs: make(map[*cachedSourceRef]struct{}),
310			id:   id,
311			dir:  dir,
312			sourceMeta: sourceMeta{
313				BackendID:   backendID,
314				SharedKey:   sharedKey,
315				CachePolicy: defaultCachePolicy(),
316			},
317			storage: s,
318		}
319		dt, err := json.Marshal(source.sourceMeta)
320		if err != nil {
321			return err
322		}
323		if err := b.Put([]byte(metaKey), dt); err != nil {
324			return err
325		}
326		s.sources[id] = source
327		ret = source
328		return nil
329	}); err != nil {
330		return nil, err
331	}
332	return ret.getRef(), nil
333}
334
335func (s *fsCacheStore) Rebase(sharedKey, newid string) (*cachedSourceRef, error) {
336	s.mu.Lock()
337	defer s.mu.Unlock()
338	var ret *cachedSource
339	for id, snap := range s.sources {
340		if snap.SharedKey == sharedKey && len(snap.refs) == 0 {
341			if err := s.db.Update(func(tx *bolt.Tx) error {
342				if err := tx.DeleteBucket([]byte(id)); err != nil {
343					return err
344				}
345				b, err := tx.CreateBucket([]byte(newid))
346				if err != nil {
347					return err
348				}
349				snap.id = newid
350				snap.CachePolicy = defaultCachePolicy()
351				dt, err := json.Marshal(snap.sourceMeta)
352				if err != nil {
353					return err
354				}
355				if err := b.Put([]byte(metaKey), dt); err != nil {
356					return err
357				}
358				delete(s.sources, id)
359				s.sources[newid] = snap
360				return nil
361			}); err != nil {
362				return nil, err
363			}
364			ret = snap
365			break
366		}
367	}
368	if ret == nil {
369		return nil, errors.Errorf("no candidate for rebase")
370	}
371	return ret.getRef(), nil
372}
373
374func (s *fsCacheStore) Get(id string) (*cachedSourceRef, error) {
375	s.mu.Lock()
376	defer s.mu.Unlock()
377	src, ok := s.sources[id]
378	if !ok {
379		return nil, errors.Errorf("not found")
380	}
381	return src.getRef(), nil
382}
383
384// DiskUsage reports how much data is allocated by the cache
385func (s *fsCacheStore) DiskUsage(ctx context.Context) (int64, error) {
386	s.mu.Lock()
387	defer s.mu.Unlock()
388	var size int64
389
390	for _, snap := range s.sources {
391		if len(snap.refs) == 0 {
392			ss, err := snap.getSize(ctx)
393			if err != nil {
394				return 0, err
395			}
396			size += ss
397		}
398	}
399	return size, nil
400}
401
402// Prune allows manually cleaning up the cache
403func (s *fsCacheStore) Prune(ctx context.Context) (uint64, error) {
404	s.mu.Lock()
405	defer s.mu.Unlock()
406	var size uint64
407
408	for id, snap := range s.sources {
409		select {
410		case <-ctx.Done():
411			logrus.Debugf("Cache prune operation cancelled, pruned size: %d", size)
412			// when the context is cancelled, only return current size and nil
413			return size, nil
414		default:
415		}
416		if len(snap.refs) == 0 {
417			ss, err := snap.getSize(ctx)
418			if err != nil {
419				return size, err
420			}
421			if err := s.delete(id); err != nil {
422				return size, errors.Wrapf(err, "failed to delete %s", id)
423			}
424			size += uint64(ss)
425		}
426	}
427	return size, nil
428}
429
430// GC runs a garbage collector on FSCache
431func (s *fsCacheStore) GC() error {
432	s.mu.Lock()
433	defer s.mu.Unlock()
434	var size uint64
435
436	ctx := context.Background()
437	cutoff := time.Now().Add(-s.gcPolicy.MaxKeepDuration)
438	var blacklist []*cachedSource
439
440	for id, snap := range s.sources {
441		if len(snap.refs) == 0 {
442			if cutoff.After(snap.CachePolicy.LastUsed) {
443				if err := s.delete(id); err != nil {
444					return errors.Wrapf(err, "failed to delete %s", id)
445				}
446			} else {
447				ss, err := snap.getSize(ctx)
448				if err != nil {
449					return err
450				}
451				size += uint64(ss)
452				blacklist = append(blacklist, snap)
453			}
454		}
455	}
456
457	sort.Sort(sortableCacheSources(blacklist))
458	for _, snap := range blacklist {
459		if size <= s.gcPolicy.MaxSize {
460			break
461		}
462		ss, err := snap.getSize(ctx)
463		if err != nil {
464			return err
465		}
466		if err := s.delete(snap.id); err != nil {
467			return errors.Wrapf(err, "failed to delete %s", snap.id)
468		}
469		size -= uint64(ss)
470	}
471	return nil
472}
473
474// keep mu while calling this
475func (s *fsCacheStore) delete(id string) error {
476	src, ok := s.sources[id]
477	if !ok {
478		return nil
479	}
480	if len(src.refs) > 0 {
481		return errors.Errorf("can't delete %s because it has active references", id)
482	}
483	delete(s.sources, id)
484	if err := s.db.Update(func(tx *bolt.Tx) error {
485		return tx.DeleteBucket([]byte(id))
486	}); err != nil {
487		return err
488	}
489	return s.fs.Remove(src.BackendID)
490}
491
492type sourceMeta struct {
493	SharedKey   string
494	BackendID   string
495	CachePolicy CachePolicy
496	Size        int64
497}
498
499type cachedSource struct {
500	sourceMeta
501	refs    map[*cachedSourceRef]struct{}
502	id      string
503	dir     string
504	src     *remotecontext.CachableSource
505	storage *fsCacheStore
506	cached  bool // keep track if cache is up to date
507}
508
509type cachedSourceRef struct {
510	*cachedSource
511}
512
513func (cs *cachedSource) Dir() string {
514	return cs.dir
515}
516
517// hold storage lock before calling
518func (cs *cachedSource) getRef() *cachedSourceRef {
519	ref := &cachedSourceRef{cachedSource: cs}
520	cs.refs[ref] = struct{}{}
521	return ref
522}
523
524// hold storage lock before calling
525func (cs *cachedSource) getSize(ctx context.Context) (int64, error) {
526	if cs.sourceMeta.Size < 0 {
527		ss, err := directory.Size(ctx, cs.dir)
528		if err != nil {
529			return 0, err
530		}
531		if err := cs.resetSize(ss); err != nil {
532			return 0, err
533		}
534		return ss, nil
535	}
536	return cs.sourceMeta.Size, nil
537}
538
539func (cs *cachedSource) resetSize(val int64) error {
540	cs.sourceMeta.Size = val
541	return cs.saveMeta()
542}
543func (cs *cachedSource) saveMeta() error {
544	return cs.storage.db.Update(func(tx *bolt.Tx) error {
545		b := tx.Bucket([]byte(cs.id))
546		dt, err := json.Marshal(cs.sourceMeta)
547		if err != nil {
548			return err
549		}
550		return b.Put([]byte(metaKey), dt)
551	})
552}
553
554func (csr *cachedSourceRef) Release() error {
555	csr.cachedSource.storage.mu.Lock()
556	defer csr.cachedSource.storage.mu.Unlock()
557	delete(csr.cachedSource.refs, csr)
558	if len(csr.cachedSource.refs) == 0 {
559		go csr.cachedSource.storage.GC()
560	}
561	return nil
562}
563
564type detectChanges struct {
565	f         fsutil.ChangeFunc
566	supported bool
567}
568
569func (dc *detectChanges) HandleChange(kind fsutil.ChangeKind, path string, fi os.FileInfo, err error) error {
570	if dc == nil {
571		return nil
572	}
573	return dc.f(kind, path, fi, err)
574}
575
576func (dc *detectChanges) MarkSupported(v bool) {
577	if dc == nil {
578		return
579	}
580	dc.supported = v
581}
582
583func (dc *detectChanges) ContentHasher() fsutil.ContentHasher {
584	return newTarsumHash
585}
586
587type wrappedContext struct {
588	builder.Source
589	closer func() error
590}
591
592func (wc *wrappedContext) Close() error {
593	if err := wc.Source.Close(); err != nil {
594		return err
595	}
596	return wc.closer()
597}
598
599type sortableCacheSources []*cachedSource
600
601// Len is the number of elements in the collection.
602func (s sortableCacheSources) Len() int {
603	return len(s)
604}
605
606// Less reports whether the element with
607// index i should sort before the element with index j.
608func (s sortableCacheSources) Less(i, j int) bool {
609	return s[i].CachePolicy.LastUsed.Before(s[j].CachePolicy.LastUsed)
610}
611
612// Swap swaps the elements with indexes i and j.
613func (s sortableCacheSources) Swap(i, j int) {
614	s[i], s[j] = s[j], s[i]
615}
616
617func newTarsumHash(stat *fsutil.Stat) (hash.Hash, error) {
618	fi := &fsutil.StatInfo{Stat: stat}
619	p := stat.Path
620	if fi.IsDir() {
621		p += string(os.PathSeparator)
622	}
623	h, err := archive.FileInfoHeader(p, fi, stat.Linkname)
624	if err != nil {
625		return nil, err
626	}
627	h.Name = p
628	h.Uid = int(stat.Uid)
629	h.Gid = int(stat.Gid)
630	h.Linkname = stat.Linkname
631	if stat.Xattrs != nil {
632		h.Xattrs = make(map[string]string)
633		for k, v := range stat.Xattrs {
634			h.Xattrs[k] = string(v)
635		}
636	}
637
638	tsh := &tarsumHash{h: h, Hash: sha256.New()}
639	tsh.Reset()
640	return tsh, nil
641}
642
643// Reset resets the Hash to its initial state.
644func (tsh *tarsumHash) Reset() {
645	tsh.Hash.Reset()
646	tarsum.WriteV1Header(tsh.h, tsh.Hash)
647}
648
649type tarsumHash struct {
650	hash.Hash
651	h *tar.Header
652}
653