1package fscache // import "github.com/docker/docker/builder/fscache" 2 3import ( 4 "archive/tar" 5 "context" 6 "crypto/sha256" 7 "encoding/json" 8 "hash" 9 "os" 10 "path/filepath" 11 "sort" 12 "sync" 13 "time" 14 15 "github.com/docker/docker/builder" 16 "github.com/docker/docker/builder/remotecontext" 17 "github.com/docker/docker/pkg/archive" 18 "github.com/docker/docker/pkg/directory" 19 "github.com/docker/docker/pkg/stringid" 20 "github.com/docker/docker/pkg/tarsum" 21 "github.com/moby/buildkit/session/filesync" 22 "github.com/pkg/errors" 23 "github.com/sirupsen/logrus" 24 "github.com/tonistiigi/fsutil" 25 fsutiltypes "github.com/tonistiigi/fsutil/types" 26 bolt "go.etcd.io/bbolt" 27 "golang.org/x/sync/singleflight" 28) 29 30const dbFile = "fscache.db" 31const cacheKey = "cache" 32const metaKey = "meta" 33 34// Backend is a backing implementation for FSCache 35type Backend interface { 36 Get(id string) (string, error) 37 Remove(id string) error 38} 39 40// FSCache allows syncing remote resources to cached snapshots 41type FSCache struct { 42 opt Opt 43 transports map[string]Transport 44 mu sync.Mutex 45 g singleflight.Group 46 store *fsCacheStore 47} 48 49// Opt defines options for initializing FSCache 50type Opt struct { 51 Backend Backend 52 Root string // for storing local metadata 53 GCPolicy GCPolicy 54} 55 56// GCPolicy defines policy for garbage collection 57type GCPolicy struct { 58 MaxSize uint64 59 MaxKeepDuration time.Duration 60} 61 62// NewFSCache returns new FSCache object 63func NewFSCache(opt Opt) (*FSCache, error) { 64 store, err := newFSCacheStore(opt) 65 if err != nil { 66 return nil, err 67 } 68 return &FSCache{ 69 store: store, 70 opt: opt, 71 transports: make(map[string]Transport), 72 }, nil 73} 74 75// Transport defines a method for syncing remote data to FSCache 76type Transport interface { 77 Copy(ctx context.Context, id RemoteIdentifier, dest string, cs filesync.CacheUpdater) error 78} 79 80// RemoteIdentifier identifies a transfer request 81type RemoteIdentifier interface { 82 Key() string 83 SharedKey() string 84 Transport() string 85} 86 87// RegisterTransport registers a new transport method 88func (fsc *FSCache) RegisterTransport(id string, transport Transport) error { 89 fsc.mu.Lock() 90 defer fsc.mu.Unlock() 91 if _, ok := fsc.transports[id]; ok { 92 return errors.Errorf("transport %v already exists", id) 93 } 94 fsc.transports[id] = transport 95 return nil 96} 97 98// SyncFrom returns a source based on a remote identifier 99func (fsc *FSCache) SyncFrom(ctx context.Context, id RemoteIdentifier) (builder.Source, error) { // cacheOpt 100 trasportID := id.Transport() 101 fsc.mu.Lock() 102 transport, ok := fsc.transports[id.Transport()] 103 if !ok { 104 fsc.mu.Unlock() 105 return nil, errors.Errorf("invalid transport %s", trasportID) 106 } 107 108 logrus.Debugf("SyncFrom %s %s", id.Key(), id.SharedKey()) 109 fsc.mu.Unlock() 110 sourceRef, err, _ := fsc.g.Do(id.Key(), func() (interface{}, error) { 111 var sourceRef *cachedSourceRef 112 sourceRef, err := fsc.store.Get(id.Key()) 113 if err == nil { 114 return sourceRef, nil 115 } 116 117 // check for unused shared cache 118 sharedKey := id.SharedKey() 119 if sharedKey != "" { 120 r, err := fsc.store.Rebase(sharedKey, id.Key()) 121 if err == nil { 122 sourceRef = r 123 } 124 } 125 126 if sourceRef == nil { 127 var err error 128 sourceRef, err = fsc.store.New(id.Key(), sharedKey) 129 if err != nil { 130 return nil, errors.Wrap(err, "failed to create remote context") 131 } 132 } 133 134 if err := syncFrom(ctx, sourceRef, transport, id); err != nil { 135 sourceRef.Release() 136 return nil, err 137 } 138 if err := sourceRef.resetSize(-1); err != nil { 139 return nil, err 140 } 141 return sourceRef, nil 142 }) 143 if err != nil { 144 return nil, err 145 } 146 ref := sourceRef.(*cachedSourceRef) 147 if ref.src == nil { // failsafe 148 return nil, errors.Errorf("invalid empty pull") 149 } 150 wc := &wrappedContext{Source: ref.src, closer: func() error { 151 ref.Release() 152 return nil 153 }} 154 return wc, nil 155} 156 157// DiskUsage reports how much data is allocated by the cache 158func (fsc *FSCache) DiskUsage(ctx context.Context) (int64, error) { 159 return fsc.store.DiskUsage(ctx) 160} 161 162// Prune allows manually cleaning up the cache 163func (fsc *FSCache) Prune(ctx context.Context) (uint64, error) { 164 return fsc.store.Prune(ctx) 165} 166 167// Close stops the gc and closes the persistent db 168func (fsc *FSCache) Close() error { 169 return fsc.store.Close() 170} 171 172func syncFrom(ctx context.Context, cs *cachedSourceRef, transport Transport, id RemoteIdentifier) (retErr error) { 173 src := cs.src 174 if src == nil { 175 src = remotecontext.NewCachableSource(cs.Dir()) 176 } 177 178 if !cs.cached { 179 if err := cs.storage.db.View(func(tx *bolt.Tx) error { 180 b := tx.Bucket([]byte(id.Key())) 181 dt := b.Get([]byte(cacheKey)) 182 if dt != nil { 183 if err := src.UnmarshalBinary(dt); err != nil { 184 return err 185 } 186 } else { 187 return errors.Wrap(src.Scan(), "failed to scan cache records") 188 } 189 return nil 190 }); err != nil { 191 return err 192 } 193 } 194 195 dc := &detectChanges{f: src.HandleChange} 196 197 // todo: probably send a bucket to `Copy` and let it return source 198 // but need to make sure that tx is safe 199 if err := transport.Copy(ctx, id, cs.Dir(), dc); err != nil { 200 return errors.Wrapf(err, "failed to copy to %s", cs.Dir()) 201 } 202 203 if !dc.supported { 204 if err := src.Scan(); err != nil { 205 return errors.Wrap(err, "failed to scan cache records after transfer") 206 } 207 } 208 cs.cached = true 209 cs.src = src 210 return cs.storage.db.Update(func(tx *bolt.Tx) error { 211 dt, err := src.MarshalBinary() 212 if err != nil { 213 return err 214 } 215 b := tx.Bucket([]byte(id.Key())) 216 return b.Put([]byte(cacheKey), dt) 217 }) 218} 219 220type fsCacheStore struct { 221 mu sync.Mutex 222 sources map[string]*cachedSource 223 db *bolt.DB 224 fs Backend 225 gcTimer *time.Timer 226 gcPolicy GCPolicy 227} 228 229// CachePolicy defines policy for keeping a resource in cache 230type CachePolicy struct { 231 Priority int 232 LastUsed time.Time 233} 234 235func defaultCachePolicy() CachePolicy { 236 return CachePolicy{Priority: 10, LastUsed: time.Now()} 237} 238 239func newFSCacheStore(opt Opt) (*fsCacheStore, error) { 240 if err := os.MkdirAll(opt.Root, 0700); err != nil { 241 return nil, err 242 } 243 p := filepath.Join(opt.Root, dbFile) 244 db, err := bolt.Open(p, 0600, nil) 245 if err != nil { 246 return nil, errors.Wrap(err, "failed to open database file %s") 247 } 248 s := &fsCacheStore{db: db, sources: make(map[string]*cachedSource), fs: opt.Backend, gcPolicy: opt.GCPolicy} 249 db.View(func(tx *bolt.Tx) error { 250 return tx.ForEach(func(name []byte, b *bolt.Bucket) error { 251 dt := b.Get([]byte(metaKey)) 252 if dt == nil { 253 return nil 254 } 255 var sm sourceMeta 256 if err := json.Unmarshal(dt, &sm); err != nil { 257 return err 258 } 259 dir, err := s.fs.Get(sm.BackendID) 260 if err != nil { 261 return err // TODO: handle gracefully 262 } 263 source := &cachedSource{ 264 refs: make(map[*cachedSourceRef]struct{}), 265 id: string(name), 266 dir: dir, 267 sourceMeta: sm, 268 storage: s, 269 } 270 s.sources[string(name)] = source 271 return nil 272 }) 273 }) 274 275 s.gcTimer = s.startPeriodicGC(5 * time.Minute) 276 return s, nil 277} 278 279func (s *fsCacheStore) startPeriodicGC(interval time.Duration) *time.Timer { 280 var t *time.Timer 281 t = time.AfterFunc(interval, func() { 282 if err := s.GC(); err != nil { 283 logrus.Errorf("build gc error: %v", err) 284 } 285 t.Reset(interval) 286 }) 287 return t 288} 289 290func (s *fsCacheStore) Close() error { 291 s.gcTimer.Stop() 292 return s.db.Close() 293} 294 295func (s *fsCacheStore) New(id, sharedKey string) (*cachedSourceRef, error) { 296 s.mu.Lock() 297 defer s.mu.Unlock() 298 var ret *cachedSource 299 if err := s.db.Update(func(tx *bolt.Tx) error { 300 b, err := tx.CreateBucket([]byte(id)) 301 if err != nil { 302 return err 303 } 304 backendID := stringid.GenerateRandomID() 305 dir, err := s.fs.Get(backendID) 306 if err != nil { 307 return err 308 } 309 source := &cachedSource{ 310 refs: make(map[*cachedSourceRef]struct{}), 311 id: id, 312 dir: dir, 313 sourceMeta: sourceMeta{ 314 BackendID: backendID, 315 SharedKey: sharedKey, 316 CachePolicy: defaultCachePolicy(), 317 }, 318 storage: s, 319 } 320 dt, err := json.Marshal(source.sourceMeta) 321 if err != nil { 322 return err 323 } 324 if err := b.Put([]byte(metaKey), dt); err != nil { 325 return err 326 } 327 s.sources[id] = source 328 ret = source 329 return nil 330 }); err != nil { 331 return nil, err 332 } 333 return ret.getRef(), nil 334} 335 336func (s *fsCacheStore) Rebase(sharedKey, newid string) (*cachedSourceRef, error) { 337 s.mu.Lock() 338 defer s.mu.Unlock() 339 var ret *cachedSource 340 for id, snap := range s.sources { 341 if snap.SharedKey == sharedKey && len(snap.refs) == 0 { 342 if err := s.db.Update(func(tx *bolt.Tx) error { 343 if err := tx.DeleteBucket([]byte(id)); err != nil { 344 return err 345 } 346 b, err := tx.CreateBucket([]byte(newid)) 347 if err != nil { 348 return err 349 } 350 snap.id = newid 351 snap.CachePolicy = defaultCachePolicy() 352 dt, err := json.Marshal(snap.sourceMeta) 353 if err != nil { 354 return err 355 } 356 if err := b.Put([]byte(metaKey), dt); err != nil { 357 return err 358 } 359 delete(s.sources, id) 360 s.sources[newid] = snap 361 return nil 362 }); err != nil { 363 return nil, err 364 } 365 ret = snap 366 break 367 } 368 } 369 if ret == nil { 370 return nil, errors.Errorf("no candidate for rebase") 371 } 372 return ret.getRef(), nil 373} 374 375func (s *fsCacheStore) Get(id string) (*cachedSourceRef, error) { 376 s.mu.Lock() 377 defer s.mu.Unlock() 378 src, ok := s.sources[id] 379 if !ok { 380 return nil, errors.Errorf("not found") 381 } 382 return src.getRef(), nil 383} 384 385// DiskUsage reports how much data is allocated by the cache 386func (s *fsCacheStore) DiskUsage(ctx context.Context) (int64, error) { 387 s.mu.Lock() 388 defer s.mu.Unlock() 389 var size int64 390 391 for _, snap := range s.sources { 392 if len(snap.refs) == 0 { 393 ss, err := snap.getSize(ctx) 394 if err != nil { 395 return 0, err 396 } 397 size += ss 398 } 399 } 400 return size, nil 401} 402 403// Prune allows manually cleaning up the cache 404func (s *fsCacheStore) Prune(ctx context.Context) (uint64, error) { 405 s.mu.Lock() 406 defer s.mu.Unlock() 407 var size uint64 408 409 for id, snap := range s.sources { 410 select { 411 case <-ctx.Done(): 412 logrus.Debugf("Cache prune operation cancelled, pruned size: %d", size) 413 // when the context is cancelled, only return current size and nil 414 return size, nil 415 default: 416 } 417 if len(snap.refs) == 0 { 418 ss, err := snap.getSize(ctx) 419 if err != nil { 420 return size, err 421 } 422 if err := s.delete(id); err != nil { 423 return size, errors.Wrapf(err, "failed to delete %s", id) 424 } 425 size += uint64(ss) 426 } 427 } 428 return size, nil 429} 430 431// GC runs a garbage collector on FSCache 432func (s *fsCacheStore) GC() error { 433 s.mu.Lock() 434 defer s.mu.Unlock() 435 var size uint64 436 437 ctx := context.Background() 438 cutoff := time.Now().Add(-s.gcPolicy.MaxKeepDuration) 439 var blacklist []*cachedSource 440 441 for id, snap := range s.sources { 442 if len(snap.refs) == 0 { 443 if cutoff.After(snap.CachePolicy.LastUsed) { 444 if err := s.delete(id); err != nil { 445 return errors.Wrapf(err, "failed to delete %s", id) 446 } 447 } else { 448 ss, err := snap.getSize(ctx) 449 if err != nil { 450 return err 451 } 452 size += uint64(ss) 453 blacklist = append(blacklist, snap) 454 } 455 } 456 } 457 458 sort.Sort(sortableCacheSources(blacklist)) 459 for _, snap := range blacklist { 460 if size <= s.gcPolicy.MaxSize { 461 break 462 } 463 ss, err := snap.getSize(ctx) 464 if err != nil { 465 return err 466 } 467 if err := s.delete(snap.id); err != nil { 468 return errors.Wrapf(err, "failed to delete %s", snap.id) 469 } 470 size -= uint64(ss) 471 } 472 return nil 473} 474 475// keep mu while calling this 476func (s *fsCacheStore) delete(id string) error { 477 src, ok := s.sources[id] 478 if !ok { 479 return nil 480 } 481 if len(src.refs) > 0 { 482 return errors.Errorf("can't delete %s because it has active references", id) 483 } 484 delete(s.sources, id) 485 if err := s.db.Update(func(tx *bolt.Tx) error { 486 return tx.DeleteBucket([]byte(id)) 487 }); err != nil { 488 return err 489 } 490 return s.fs.Remove(src.BackendID) 491} 492 493type sourceMeta struct { 494 SharedKey string 495 BackendID string 496 CachePolicy CachePolicy 497 Size int64 498} 499 500type cachedSource struct { 501 sourceMeta 502 refs map[*cachedSourceRef]struct{} 503 id string 504 dir string 505 src *remotecontext.CachableSource 506 storage *fsCacheStore 507 cached bool // keep track if cache is up to date 508} 509 510type cachedSourceRef struct { 511 *cachedSource 512} 513 514func (cs *cachedSource) Dir() string { 515 return cs.dir 516} 517 518// hold storage lock before calling 519func (cs *cachedSource) getRef() *cachedSourceRef { 520 ref := &cachedSourceRef{cachedSource: cs} 521 cs.refs[ref] = struct{}{} 522 return ref 523} 524 525// hold storage lock before calling 526func (cs *cachedSource) getSize(ctx context.Context) (int64, error) { 527 if cs.sourceMeta.Size < 0 { 528 ss, err := directory.Size(ctx, cs.dir) 529 if err != nil { 530 return 0, err 531 } 532 if err := cs.resetSize(ss); err != nil { 533 return 0, err 534 } 535 return ss, nil 536 } 537 return cs.sourceMeta.Size, nil 538} 539 540func (cs *cachedSource) resetSize(val int64) error { 541 cs.sourceMeta.Size = val 542 return cs.saveMeta() 543} 544func (cs *cachedSource) saveMeta() error { 545 return cs.storage.db.Update(func(tx *bolt.Tx) error { 546 b := tx.Bucket([]byte(cs.id)) 547 dt, err := json.Marshal(cs.sourceMeta) 548 if err != nil { 549 return err 550 } 551 return b.Put([]byte(metaKey), dt) 552 }) 553} 554 555func (csr *cachedSourceRef) Release() error { 556 csr.cachedSource.storage.mu.Lock() 557 defer csr.cachedSource.storage.mu.Unlock() 558 delete(csr.cachedSource.refs, csr) 559 if len(csr.cachedSource.refs) == 0 { 560 go csr.cachedSource.storage.GC() 561 } 562 return nil 563} 564 565type detectChanges struct { 566 f fsutil.ChangeFunc 567 supported bool 568} 569 570func (dc *detectChanges) HandleChange(kind fsutil.ChangeKind, path string, fi os.FileInfo, err error) error { 571 if dc == nil { 572 return nil 573 } 574 return dc.f(kind, path, fi, err) 575} 576 577func (dc *detectChanges) MarkSupported(v bool) { 578 if dc == nil { 579 return 580 } 581 dc.supported = v 582} 583 584func (dc *detectChanges) ContentHasher() fsutil.ContentHasher { 585 return newTarsumHash 586} 587 588type wrappedContext struct { 589 builder.Source 590 closer func() error 591} 592 593func (wc *wrappedContext) Close() error { 594 if err := wc.Source.Close(); err != nil { 595 return err 596 } 597 return wc.closer() 598} 599 600type sortableCacheSources []*cachedSource 601 602// Len is the number of elements in the collection. 603func (s sortableCacheSources) Len() int { 604 return len(s) 605} 606 607// Less reports whether the element with 608// index i should sort before the element with index j. 609func (s sortableCacheSources) Less(i, j int) bool { 610 return s[i].CachePolicy.LastUsed.Before(s[j].CachePolicy.LastUsed) 611} 612 613// Swap swaps the elements with indexes i and j. 614func (s sortableCacheSources) Swap(i, j int) { 615 s[i], s[j] = s[j], s[i] 616} 617 618func newTarsumHash(stat *fsutiltypes.Stat) (hash.Hash, error) { 619 fi := &fsutil.StatInfo{Stat: stat} 620 p := stat.Path 621 if fi.IsDir() { 622 p += string(os.PathSeparator) 623 } 624 h, err := archive.FileInfoHeader(p, fi, stat.Linkname) 625 if err != nil { 626 return nil, err 627 } 628 h.Name = p 629 h.Uid = int(stat.Uid) 630 h.Gid = int(stat.Gid) 631 h.Linkname = stat.Linkname 632 if stat.Xattrs != nil { 633 h.Xattrs = make(map[string]string) 634 for k, v := range stat.Xattrs { 635 h.Xattrs[k] = string(v) 636 } 637 } 638 639 tsh := &tarsumHash{h: h, Hash: sha256.New()} 640 tsh.Reset() 641 return tsh, nil 642} 643 644// Reset resets the Hash to its initial state. 645func (tsh *tarsumHash) Reset() { 646 tsh.Hash.Reset() 647 tarsum.WriteV1Header(tsh.h, tsh.Hash) 648} 649 650type tarsumHash struct { 651 hash.Hash 652 h *tar.Header 653} 654