1package fscache // import "github.com/docker/docker/builder/fscache" 2 3import ( 4 "archive/tar" 5 "context" 6 "crypto/sha256" 7 "encoding/json" 8 "hash" 9 "os" 10 "path/filepath" 11 "sort" 12 "sync" 13 "time" 14 15 "github.com/boltdb/bolt" 16 "github.com/docker/docker/builder" 17 "github.com/docker/docker/builder/remotecontext" 18 "github.com/docker/docker/pkg/archive" 19 "github.com/docker/docker/pkg/directory" 20 "github.com/docker/docker/pkg/stringid" 21 "github.com/docker/docker/pkg/tarsum" 22 "github.com/moby/buildkit/session/filesync" 23 "github.com/pkg/errors" 24 "github.com/sirupsen/logrus" 25 "github.com/tonistiigi/fsutil" 26 "golang.org/x/sync/singleflight" 27) 28 29const dbFile = "fscache.db" 30const cacheKey = "cache" 31const metaKey = "meta" 32 33// Backend is a backing implementation for FSCache 34type Backend interface { 35 Get(id string) (string, error) 36 Remove(id string) error 37} 38 39// FSCache allows syncing remote resources to cached snapshots 40type FSCache struct { 41 opt Opt 42 transports map[string]Transport 43 mu sync.Mutex 44 g singleflight.Group 45 store *fsCacheStore 46} 47 48// Opt defines options for initializing FSCache 49type Opt struct { 50 Backend Backend 51 Root string // for storing local metadata 52 GCPolicy GCPolicy 53} 54 55// GCPolicy defines policy for garbage collection 56type GCPolicy struct { 57 MaxSize uint64 58 MaxKeepDuration time.Duration 59} 60 61// NewFSCache returns new FSCache object 62func NewFSCache(opt Opt) (*FSCache, error) { 63 store, err := newFSCacheStore(opt) 64 if err != nil { 65 return nil, err 66 } 67 return &FSCache{ 68 store: store, 69 opt: opt, 70 transports: make(map[string]Transport), 71 }, nil 72} 73 74// Transport defines a method for syncing remote data to FSCache 75type Transport interface { 76 Copy(ctx context.Context, id RemoteIdentifier, dest string, cs filesync.CacheUpdater) error 77} 78 79// RemoteIdentifier identifies a transfer request 80type RemoteIdentifier interface { 81 Key() string 82 SharedKey() string 83 Transport() string 84} 85 86// RegisterTransport registers a new transport method 87func (fsc *FSCache) RegisterTransport(id string, transport Transport) error { 88 fsc.mu.Lock() 89 defer fsc.mu.Unlock() 90 if _, ok := fsc.transports[id]; ok { 91 return errors.Errorf("transport %v already exists", id) 92 } 93 fsc.transports[id] = transport 94 return nil 95} 96 97// SyncFrom returns a source based on a remote identifier 98func (fsc *FSCache) SyncFrom(ctx context.Context, id RemoteIdentifier) (builder.Source, error) { // cacheOpt 99 trasportID := id.Transport() 100 fsc.mu.Lock() 101 transport, ok := fsc.transports[id.Transport()] 102 if !ok { 103 fsc.mu.Unlock() 104 return nil, errors.Errorf("invalid transport %s", trasportID) 105 } 106 107 logrus.Debugf("SyncFrom %s %s", id.Key(), id.SharedKey()) 108 fsc.mu.Unlock() 109 sourceRef, err, _ := fsc.g.Do(id.Key(), func() (interface{}, error) { 110 var sourceRef *cachedSourceRef 111 sourceRef, err := fsc.store.Get(id.Key()) 112 if err == nil { 113 return sourceRef, nil 114 } 115 116 // check for unused shared cache 117 sharedKey := id.SharedKey() 118 if sharedKey != "" { 119 r, err := fsc.store.Rebase(sharedKey, id.Key()) 120 if err == nil { 121 sourceRef = r 122 } 123 } 124 125 if sourceRef == nil { 126 var err error 127 sourceRef, err = fsc.store.New(id.Key(), sharedKey) 128 if err != nil { 129 return nil, errors.Wrap(err, "failed to create remote context") 130 } 131 } 132 133 if err := syncFrom(ctx, sourceRef, transport, id); err != nil { 134 sourceRef.Release() 135 return nil, err 136 } 137 if err := sourceRef.resetSize(-1); err != nil { 138 return nil, err 139 } 140 return sourceRef, nil 141 }) 142 if err != nil { 143 return nil, err 144 } 145 ref := sourceRef.(*cachedSourceRef) 146 if ref.src == nil { // failsafe 147 return nil, errors.Errorf("invalid empty pull") 148 } 149 wc := &wrappedContext{Source: ref.src, closer: func() error { 150 ref.Release() 151 return nil 152 }} 153 return wc, nil 154} 155 156// DiskUsage reports how much data is allocated by the cache 157func (fsc *FSCache) DiskUsage(ctx context.Context) (int64, error) { 158 return fsc.store.DiskUsage(ctx) 159} 160 161// Prune allows manually cleaning up the cache 162func (fsc *FSCache) Prune(ctx context.Context) (uint64, error) { 163 return fsc.store.Prune(ctx) 164} 165 166// Close stops the gc and closes the persistent db 167func (fsc *FSCache) Close() error { 168 return fsc.store.Close() 169} 170 171func syncFrom(ctx context.Context, cs *cachedSourceRef, transport Transport, id RemoteIdentifier) (retErr error) { 172 src := cs.src 173 if src == nil { 174 src = remotecontext.NewCachableSource(cs.Dir()) 175 } 176 177 if !cs.cached { 178 if err := cs.storage.db.View(func(tx *bolt.Tx) error { 179 b := tx.Bucket([]byte(id.Key())) 180 dt := b.Get([]byte(cacheKey)) 181 if dt != nil { 182 if err := src.UnmarshalBinary(dt); err != nil { 183 return err 184 } 185 } else { 186 return errors.Wrap(src.Scan(), "failed to scan cache records") 187 } 188 return nil 189 }); err != nil { 190 return err 191 } 192 } 193 194 dc := &detectChanges{f: src.HandleChange} 195 196 // todo: probably send a bucket to `Copy` and let it return source 197 // but need to make sure that tx is safe 198 if err := transport.Copy(ctx, id, cs.Dir(), dc); err != nil { 199 return errors.Wrapf(err, "failed to copy to %s", cs.Dir()) 200 } 201 202 if !dc.supported { 203 if err := src.Scan(); err != nil { 204 return errors.Wrap(err, "failed to scan cache records after transfer") 205 } 206 } 207 cs.cached = true 208 cs.src = src 209 return cs.storage.db.Update(func(tx *bolt.Tx) error { 210 dt, err := src.MarshalBinary() 211 if err != nil { 212 return err 213 } 214 b := tx.Bucket([]byte(id.Key())) 215 return b.Put([]byte(cacheKey), dt) 216 }) 217} 218 219type fsCacheStore struct { 220 mu sync.Mutex 221 sources map[string]*cachedSource 222 db *bolt.DB 223 fs Backend 224 gcTimer *time.Timer 225 gcPolicy GCPolicy 226} 227 228// CachePolicy defines policy for keeping a resource in cache 229type CachePolicy struct { 230 Priority int 231 LastUsed time.Time 232} 233 234func defaultCachePolicy() CachePolicy { 235 return CachePolicy{Priority: 10, LastUsed: time.Now()} 236} 237 238func newFSCacheStore(opt Opt) (*fsCacheStore, error) { 239 if err := os.MkdirAll(opt.Root, 0700); err != nil { 240 return nil, err 241 } 242 p := filepath.Join(opt.Root, dbFile) 243 db, err := bolt.Open(p, 0600, nil) 244 if err != nil { 245 return nil, errors.Wrap(err, "failed to open database file %s") 246 } 247 s := &fsCacheStore{db: db, sources: make(map[string]*cachedSource), fs: opt.Backend, gcPolicy: opt.GCPolicy} 248 db.View(func(tx *bolt.Tx) error { 249 return tx.ForEach(func(name []byte, b *bolt.Bucket) error { 250 dt := b.Get([]byte(metaKey)) 251 if dt == nil { 252 return nil 253 } 254 var sm sourceMeta 255 if err := json.Unmarshal(dt, &sm); err != nil { 256 return err 257 } 258 dir, err := s.fs.Get(sm.BackendID) 259 if err != nil { 260 return err // TODO: handle gracefully 261 } 262 source := &cachedSource{ 263 refs: make(map[*cachedSourceRef]struct{}), 264 id: string(name), 265 dir: dir, 266 sourceMeta: sm, 267 storage: s, 268 } 269 s.sources[string(name)] = source 270 return nil 271 }) 272 }) 273 274 s.gcTimer = s.startPeriodicGC(5 * time.Minute) 275 return s, nil 276} 277 278func (s *fsCacheStore) startPeriodicGC(interval time.Duration) *time.Timer { 279 var t *time.Timer 280 t = time.AfterFunc(interval, func() { 281 if err := s.GC(); err != nil { 282 logrus.Errorf("build gc error: %v", err) 283 } 284 t.Reset(interval) 285 }) 286 return t 287} 288 289func (s *fsCacheStore) Close() error { 290 s.gcTimer.Stop() 291 return s.db.Close() 292} 293 294func (s *fsCacheStore) New(id, sharedKey string) (*cachedSourceRef, error) { 295 s.mu.Lock() 296 defer s.mu.Unlock() 297 var ret *cachedSource 298 if err := s.db.Update(func(tx *bolt.Tx) error { 299 b, err := tx.CreateBucket([]byte(id)) 300 if err != nil { 301 return err 302 } 303 backendID := stringid.GenerateRandomID() 304 dir, err := s.fs.Get(backendID) 305 if err != nil { 306 return err 307 } 308 source := &cachedSource{ 309 refs: make(map[*cachedSourceRef]struct{}), 310 id: id, 311 dir: dir, 312 sourceMeta: sourceMeta{ 313 BackendID: backendID, 314 SharedKey: sharedKey, 315 CachePolicy: defaultCachePolicy(), 316 }, 317 storage: s, 318 } 319 dt, err := json.Marshal(source.sourceMeta) 320 if err != nil { 321 return err 322 } 323 if err := b.Put([]byte(metaKey), dt); err != nil { 324 return err 325 } 326 s.sources[id] = source 327 ret = source 328 return nil 329 }); err != nil { 330 return nil, err 331 } 332 return ret.getRef(), nil 333} 334 335func (s *fsCacheStore) Rebase(sharedKey, newid string) (*cachedSourceRef, error) { 336 s.mu.Lock() 337 defer s.mu.Unlock() 338 var ret *cachedSource 339 for id, snap := range s.sources { 340 if snap.SharedKey == sharedKey && len(snap.refs) == 0 { 341 if err := s.db.Update(func(tx *bolt.Tx) error { 342 if err := tx.DeleteBucket([]byte(id)); err != nil { 343 return err 344 } 345 b, err := tx.CreateBucket([]byte(newid)) 346 if err != nil { 347 return err 348 } 349 snap.id = newid 350 snap.CachePolicy = defaultCachePolicy() 351 dt, err := json.Marshal(snap.sourceMeta) 352 if err != nil { 353 return err 354 } 355 if err := b.Put([]byte(metaKey), dt); err != nil { 356 return err 357 } 358 delete(s.sources, id) 359 s.sources[newid] = snap 360 return nil 361 }); err != nil { 362 return nil, err 363 } 364 ret = snap 365 break 366 } 367 } 368 if ret == nil { 369 return nil, errors.Errorf("no candidate for rebase") 370 } 371 return ret.getRef(), nil 372} 373 374func (s *fsCacheStore) Get(id string) (*cachedSourceRef, error) { 375 s.mu.Lock() 376 defer s.mu.Unlock() 377 src, ok := s.sources[id] 378 if !ok { 379 return nil, errors.Errorf("not found") 380 } 381 return src.getRef(), nil 382} 383 384// DiskUsage reports how much data is allocated by the cache 385func (s *fsCacheStore) DiskUsage(ctx context.Context) (int64, error) { 386 s.mu.Lock() 387 defer s.mu.Unlock() 388 var size int64 389 390 for _, snap := range s.sources { 391 if len(snap.refs) == 0 { 392 ss, err := snap.getSize(ctx) 393 if err != nil { 394 return 0, err 395 } 396 size += ss 397 } 398 } 399 return size, nil 400} 401 402// Prune allows manually cleaning up the cache 403func (s *fsCacheStore) Prune(ctx context.Context) (uint64, error) { 404 s.mu.Lock() 405 defer s.mu.Unlock() 406 var size uint64 407 408 for id, snap := range s.sources { 409 select { 410 case <-ctx.Done(): 411 logrus.Debugf("Cache prune operation cancelled, pruned size: %d", size) 412 // when the context is cancelled, only return current size and nil 413 return size, nil 414 default: 415 } 416 if len(snap.refs) == 0 { 417 ss, err := snap.getSize(ctx) 418 if err != nil { 419 return size, err 420 } 421 if err := s.delete(id); err != nil { 422 return size, errors.Wrapf(err, "failed to delete %s", id) 423 } 424 size += uint64(ss) 425 } 426 } 427 return size, nil 428} 429 430// GC runs a garbage collector on FSCache 431func (s *fsCacheStore) GC() error { 432 s.mu.Lock() 433 defer s.mu.Unlock() 434 var size uint64 435 436 ctx := context.Background() 437 cutoff := time.Now().Add(-s.gcPolicy.MaxKeepDuration) 438 var blacklist []*cachedSource 439 440 for id, snap := range s.sources { 441 if len(snap.refs) == 0 { 442 if cutoff.After(snap.CachePolicy.LastUsed) { 443 if err := s.delete(id); err != nil { 444 return errors.Wrapf(err, "failed to delete %s", id) 445 } 446 } else { 447 ss, err := snap.getSize(ctx) 448 if err != nil { 449 return err 450 } 451 size += uint64(ss) 452 blacklist = append(blacklist, snap) 453 } 454 } 455 } 456 457 sort.Sort(sortableCacheSources(blacklist)) 458 for _, snap := range blacklist { 459 if size <= s.gcPolicy.MaxSize { 460 break 461 } 462 ss, err := snap.getSize(ctx) 463 if err != nil { 464 return err 465 } 466 if err := s.delete(snap.id); err != nil { 467 return errors.Wrapf(err, "failed to delete %s", snap.id) 468 } 469 size -= uint64(ss) 470 } 471 return nil 472} 473 474// keep mu while calling this 475func (s *fsCacheStore) delete(id string) error { 476 src, ok := s.sources[id] 477 if !ok { 478 return nil 479 } 480 if len(src.refs) > 0 { 481 return errors.Errorf("can't delete %s because it has active references", id) 482 } 483 delete(s.sources, id) 484 if err := s.db.Update(func(tx *bolt.Tx) error { 485 return tx.DeleteBucket([]byte(id)) 486 }); err != nil { 487 return err 488 } 489 return s.fs.Remove(src.BackendID) 490} 491 492type sourceMeta struct { 493 SharedKey string 494 BackendID string 495 CachePolicy CachePolicy 496 Size int64 497} 498 499type cachedSource struct { 500 sourceMeta 501 refs map[*cachedSourceRef]struct{} 502 id string 503 dir string 504 src *remotecontext.CachableSource 505 storage *fsCacheStore 506 cached bool // keep track if cache is up to date 507} 508 509type cachedSourceRef struct { 510 *cachedSource 511} 512 513func (cs *cachedSource) Dir() string { 514 return cs.dir 515} 516 517// hold storage lock before calling 518func (cs *cachedSource) getRef() *cachedSourceRef { 519 ref := &cachedSourceRef{cachedSource: cs} 520 cs.refs[ref] = struct{}{} 521 return ref 522} 523 524// hold storage lock before calling 525func (cs *cachedSource) getSize(ctx context.Context) (int64, error) { 526 if cs.sourceMeta.Size < 0 { 527 ss, err := directory.Size(ctx, cs.dir) 528 if err != nil { 529 return 0, err 530 } 531 if err := cs.resetSize(ss); err != nil { 532 return 0, err 533 } 534 return ss, nil 535 } 536 return cs.sourceMeta.Size, nil 537} 538 539func (cs *cachedSource) resetSize(val int64) error { 540 cs.sourceMeta.Size = val 541 return cs.saveMeta() 542} 543func (cs *cachedSource) saveMeta() error { 544 return cs.storage.db.Update(func(tx *bolt.Tx) error { 545 b := tx.Bucket([]byte(cs.id)) 546 dt, err := json.Marshal(cs.sourceMeta) 547 if err != nil { 548 return err 549 } 550 return b.Put([]byte(metaKey), dt) 551 }) 552} 553 554func (csr *cachedSourceRef) Release() error { 555 csr.cachedSource.storage.mu.Lock() 556 defer csr.cachedSource.storage.mu.Unlock() 557 delete(csr.cachedSource.refs, csr) 558 if len(csr.cachedSource.refs) == 0 { 559 go csr.cachedSource.storage.GC() 560 } 561 return nil 562} 563 564type detectChanges struct { 565 f fsutil.ChangeFunc 566 supported bool 567} 568 569func (dc *detectChanges) HandleChange(kind fsutil.ChangeKind, path string, fi os.FileInfo, err error) error { 570 if dc == nil { 571 return nil 572 } 573 return dc.f(kind, path, fi, err) 574} 575 576func (dc *detectChanges) MarkSupported(v bool) { 577 if dc == nil { 578 return 579 } 580 dc.supported = v 581} 582 583func (dc *detectChanges) ContentHasher() fsutil.ContentHasher { 584 return newTarsumHash 585} 586 587type wrappedContext struct { 588 builder.Source 589 closer func() error 590} 591 592func (wc *wrappedContext) Close() error { 593 if err := wc.Source.Close(); err != nil { 594 return err 595 } 596 return wc.closer() 597} 598 599type sortableCacheSources []*cachedSource 600 601// Len is the number of elements in the collection. 602func (s sortableCacheSources) Len() int { 603 return len(s) 604} 605 606// Less reports whether the element with 607// index i should sort before the element with index j. 608func (s sortableCacheSources) Less(i, j int) bool { 609 return s[i].CachePolicy.LastUsed.Before(s[j].CachePolicy.LastUsed) 610} 611 612// Swap swaps the elements with indexes i and j. 613func (s sortableCacheSources) Swap(i, j int) { 614 s[i], s[j] = s[j], s[i] 615} 616 617func newTarsumHash(stat *fsutil.Stat) (hash.Hash, error) { 618 fi := &fsutil.StatInfo{Stat: stat} 619 p := stat.Path 620 if fi.IsDir() { 621 p += string(os.PathSeparator) 622 } 623 h, err := archive.FileInfoHeader(p, fi, stat.Linkname) 624 if err != nil { 625 return nil, err 626 } 627 h.Name = p 628 h.Uid = int(stat.Uid) 629 h.Gid = int(stat.Gid) 630 h.Linkname = stat.Linkname 631 if stat.Xattrs != nil { 632 h.Xattrs = make(map[string]string) 633 for k, v := range stat.Xattrs { 634 h.Xattrs[k] = string(v) 635 } 636 } 637 638 tsh := &tarsumHash{h: h, Hash: sha256.New()} 639 tsh.Reset() 640 return tsh, nil 641} 642 643// Reset resets the Hash to its initial state. 644func (tsh *tarsumHash) Reset() { 645 tsh.Hash.Reset() 646 tarsum.WriteV1Header(tsh.h, tsh.Hash) 647} 648 649type tarsumHash struct { 650 hash.Hash 651 h *tar.Header 652} 653