1/* 2 Copyright The containerd Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15*/ 16 17package local 18 19import ( 20 "context" 21 "fmt" 22 "io" 23 "io/ioutil" 24 "math/rand" 25 "os" 26 "path/filepath" 27 "strconv" 28 "strings" 29 "sync" 30 "time" 31 32 "github.com/containerd/containerd/content" 33 "github.com/containerd/containerd/errdefs" 34 "github.com/containerd/containerd/filters" 35 "github.com/containerd/containerd/log" 36 "github.com/sirupsen/logrus" 37 38 digest "github.com/opencontainers/go-digest" 39 ocispec "github.com/opencontainers/image-spec/specs-go/v1" 40 "github.com/pkg/errors" 41) 42 43var bufPool = sync.Pool{ 44 New: func() interface{} { 45 buffer := make([]byte, 1<<20) 46 return &buffer 47 }, 48} 49 50// LabelStore is used to store mutable labels for digests 51type LabelStore interface { 52 // Get returns all the labels for the given digest 53 Get(digest.Digest) (map[string]string, error) 54 55 // Set sets all the labels for a given digest 56 Set(digest.Digest, map[string]string) error 57 58 // Update replaces the given labels for a digest, 59 // a key with an empty value removes a label. 60 Update(digest.Digest, map[string]string) (map[string]string, error) 61} 62 63// Store is digest-keyed store for content. All data written into the store is 64// stored under a verifiable digest. 65// 66// Store can generally support multi-reader, single-writer ingest of data, 67// including resumable ingest. 68type store struct { 69 root string 70 ls LabelStore 71} 72 73// NewStore returns a local content store 74func NewStore(root string) (content.Store, error) { 75 return NewLabeledStore(root, nil) 76} 77 78// NewLabeledStore returns a new content store using the provided label store 79// 80// Note: content stores which are used underneath a metadata store may not 81// require labels and should use `NewStore`. `NewLabeledStore` is primarily 82// useful for tests or standalone implementations. 83func NewLabeledStore(root string, ls LabelStore) (content.Store, error) { 84 if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil { 85 return nil, err 86 } 87 88 return &store{ 89 root: root, 90 ls: ls, 91 }, nil 92} 93 94func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { 95 p, err := s.blobPath(dgst) 96 if err != nil { 97 return content.Info{}, errors.Wrapf(err, "calculating blob info path") 98 } 99 100 fi, err := os.Stat(p) 101 if err != nil { 102 if os.IsNotExist(err) { 103 err = errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst) 104 } 105 106 return content.Info{}, err 107 } 108 var labels map[string]string 109 if s.ls != nil { 110 labels, err = s.ls.Get(dgst) 111 if err != nil { 112 return content.Info{}, err 113 } 114 } 115 return s.info(dgst, fi, labels), nil 116} 117 118func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]string) content.Info { 119 return content.Info{ 120 Digest: dgst, 121 Size: fi.Size(), 122 CreatedAt: fi.ModTime(), 123 UpdatedAt: getATime(fi), 124 Labels: labels, 125 } 126} 127 128// ReaderAt returns an io.ReaderAt for the blob. 129func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { 130 p, err := s.blobPath(desc.Digest) 131 if err != nil { 132 return nil, errors.Wrapf(err, "calculating blob path for ReaderAt") 133 } 134 135 reader, err := OpenReader(p) 136 if err != nil { 137 return nil, errors.Wrapf(err, "blob %s expected at %s", desc.Digest, p) 138 } 139 140 return reader, nil 141} 142 143// Delete removes a blob by its digest. 144// 145// While this is safe to do concurrently, safe exist-removal logic must hold 146// some global lock on the store. 147func (s *store) Delete(ctx context.Context, dgst digest.Digest) error { 148 bp, err := s.blobPath(dgst) 149 if err != nil { 150 return errors.Wrapf(err, "calculating blob path for delete") 151 } 152 153 if err := os.RemoveAll(bp); err != nil { 154 if !os.IsNotExist(err) { 155 return err 156 } 157 158 return errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst) 159 } 160 161 return nil 162} 163 164func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { 165 if s.ls == nil { 166 return content.Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store") 167 } 168 169 p, err := s.blobPath(info.Digest) 170 if err != nil { 171 return content.Info{}, errors.Wrapf(err, "calculating blob path for update") 172 } 173 174 fi, err := os.Stat(p) 175 if err != nil { 176 if os.IsNotExist(err) { 177 err = errors.Wrapf(errdefs.ErrNotFound, "content %v", info.Digest) 178 } 179 180 return content.Info{}, err 181 } 182 183 var ( 184 all bool 185 labels map[string]string 186 ) 187 if len(fieldpaths) > 0 { 188 for _, path := range fieldpaths { 189 if strings.HasPrefix(path, "labels.") { 190 if labels == nil { 191 labels = map[string]string{} 192 } 193 194 key := strings.TrimPrefix(path, "labels.") 195 labels[key] = info.Labels[key] 196 continue 197 } 198 199 switch path { 200 case "labels": 201 all = true 202 labels = info.Labels 203 default: 204 return content.Info{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest) 205 } 206 } 207 } else { 208 all = true 209 labels = info.Labels 210 } 211 212 if all { 213 err = s.ls.Set(info.Digest, labels) 214 } else { 215 labels, err = s.ls.Update(info.Digest, labels) 216 } 217 if err != nil { 218 return content.Info{}, err 219 } 220 221 info = s.info(info.Digest, fi, labels) 222 info.UpdatedAt = time.Now() 223 224 if err := os.Chtimes(p, info.UpdatedAt, info.CreatedAt); err != nil { 225 log.G(ctx).WithError(err).Warnf("could not change access time for %s", info.Digest) 226 } 227 228 return info, nil 229} 230 231func (s *store) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error { 232 root := filepath.Join(s.root, "blobs") 233 234 filter, err := filters.ParseAll(fs...) 235 if err != nil { 236 return err 237 } 238 239 var alg digest.Algorithm 240 return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { 241 if err != nil { 242 return err 243 } 244 if !fi.IsDir() && !alg.Available() { 245 return nil 246 } 247 248 // TODO(stevvooe): There are few more cases with subdirs that should be 249 // handled in case the layout gets corrupted. This isn't strict enough 250 // and may spew bad data. 251 252 if path == root { 253 return nil 254 } 255 if filepath.Dir(path) == root { 256 alg = digest.Algorithm(filepath.Base(path)) 257 258 if !alg.Available() { 259 alg = "" 260 return filepath.SkipDir 261 } 262 263 // descending into a hash directory 264 return nil 265 } 266 267 dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path)) 268 if err := dgst.Validate(); err != nil { 269 // log error but don't report 270 log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path") 271 // if we see this, it could mean some sort of corruption of the 272 // store or extra paths not expected previously. 273 } 274 275 var labels map[string]string 276 if s.ls != nil { 277 labels, err = s.ls.Get(dgst) 278 if err != nil { 279 return err 280 } 281 } 282 283 info := s.info(dgst, fi, labels) 284 if !filter.Match(content.AdaptInfo(info)) { 285 return nil 286 } 287 return fn(info) 288 }) 289} 290 291func (s *store) Status(ctx context.Context, ref string) (content.Status, error) { 292 return s.status(s.ingestRoot(ref)) 293} 294 295func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) { 296 fp, err := os.Open(filepath.Join(s.root, "ingest")) 297 if err != nil { 298 return nil, err 299 } 300 301 defer fp.Close() 302 303 fis, err := fp.Readdir(-1) 304 if err != nil { 305 return nil, err 306 } 307 308 filter, err := filters.ParseAll(fs...) 309 if err != nil { 310 return nil, err 311 } 312 313 var active []content.Status 314 for _, fi := range fis { 315 p := filepath.Join(s.root, "ingest", fi.Name()) 316 stat, err := s.status(p) 317 if err != nil { 318 if !os.IsNotExist(err) { 319 return nil, err 320 } 321 322 // TODO(stevvooe): This is a common error if uploads are being 323 // completed while making this listing. Need to consider taking a 324 // lock on the whole store to coordinate this aspect. 325 // 326 // Another option is to cleanup downloads asynchronously and 327 // coordinate this method with the cleanup process. 328 // 329 // For now, we just skip them, as they really don't exist. 330 continue 331 } 332 333 if filter.Match(adaptStatus(stat)) { 334 active = append(active, stat) 335 } 336 } 337 338 return active, nil 339} 340 341// WalkStatusRefs is used to walk all status references 342// Failed status reads will be logged and ignored, if 343// this function is called while references are being altered, 344// these error messages may be produced. 345func (s *store) WalkStatusRefs(ctx context.Context, fn func(string) error) error { 346 fp, err := os.Open(filepath.Join(s.root, "ingest")) 347 if err != nil { 348 return err 349 } 350 351 defer fp.Close() 352 353 fis, err := fp.Readdir(-1) 354 if err != nil { 355 return err 356 } 357 358 for _, fi := range fis { 359 rf := filepath.Join(s.root, "ingest", fi.Name(), "ref") 360 361 ref, err := readFileString(rf) 362 if err != nil { 363 log.G(ctx).WithError(err).WithField("path", rf).Error("failed to read ingest ref") 364 continue 365 } 366 367 if err := fn(ref); err != nil { 368 return err 369 } 370 } 371 372 return nil 373} 374 375// status works like stat above except uses the path to the ingest. 376func (s *store) status(ingestPath string) (content.Status, error) { 377 dp := filepath.Join(ingestPath, "data") 378 fi, err := os.Stat(dp) 379 if err != nil { 380 if os.IsNotExist(err) { 381 err = errors.Wrap(errdefs.ErrNotFound, err.Error()) 382 } 383 return content.Status{}, err 384 } 385 386 ref, err := readFileString(filepath.Join(ingestPath, "ref")) 387 if err != nil { 388 if os.IsNotExist(err) { 389 err = errors.Wrap(errdefs.ErrNotFound, err.Error()) 390 } 391 return content.Status{}, err 392 } 393 394 startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat")) 395 if err != nil { 396 return content.Status{}, errors.Wrapf(err, "could not read startedat") 397 } 398 399 updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat")) 400 if err != nil { 401 return content.Status{}, errors.Wrapf(err, "could not read updatedat") 402 } 403 404 // because we don't write updatedat on every write, the mod time may 405 // actually be more up to date. 406 if fi.ModTime().After(updatedAt) { 407 updatedAt = fi.ModTime() 408 } 409 410 return content.Status{ 411 Ref: ref, 412 Offset: fi.Size(), 413 Total: s.total(ingestPath), 414 UpdatedAt: updatedAt, 415 StartedAt: startedAt, 416 }, nil 417} 418 419func adaptStatus(status content.Status) filters.Adaptor { 420 return filters.AdapterFunc(func(fieldpath []string) (string, bool) { 421 if len(fieldpath) == 0 { 422 return "", false 423 } 424 switch fieldpath[0] { 425 case "ref": 426 return status.Ref, true 427 } 428 429 return "", false 430 }) 431} 432 433// total attempts to resolve the total expected size for the write. 434func (s *store) total(ingestPath string) int64 { 435 totalS, err := readFileString(filepath.Join(ingestPath, "total")) 436 if err != nil { 437 return 0 438 } 439 440 total, err := strconv.ParseInt(totalS, 10, 64) 441 if err != nil { 442 // represents a corrupted file, should probably remove. 443 return 0 444 } 445 446 return total 447} 448 449// Writer begins or resumes the active writer identified by ref. If the writer 450// is already in use, an error is returned. Only one writer may be in use per 451// ref at a time. 452// 453// The argument `ref` is used to uniquely identify a long-lived writer transaction. 454func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { 455 var wOpts content.WriterOpts 456 for _, opt := range opts { 457 if err := opt(&wOpts); err != nil { 458 return nil, err 459 } 460 } 461 // TODO(AkihiroSuda): we could create a random string or one calculated based on the context 462 // https://github.com/containerd/containerd/issues/2129#issuecomment-380255019 463 if wOpts.Ref == "" { 464 return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty") 465 } 466 var lockErr error 467 for count := uint64(0); count < 10; count++ { 468 if err := tryLock(wOpts.Ref); err != nil { 469 if !errdefs.IsUnavailable(err) { 470 return nil, err 471 } 472 473 lockErr = err 474 } else { 475 lockErr = nil 476 break 477 } 478 time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count))) 479 } 480 481 if lockErr != nil { 482 return nil, lockErr 483 } 484 485 w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest) 486 if err != nil { 487 unlock(wOpts.Ref) 488 return nil, err 489 } 490 491 return w, nil // lock is now held by w. 492} 493 494func (s *store) resumeStatus(ref string, total int64, digester digest.Digester) (content.Status, error) { 495 path, _, data := s.ingestPaths(ref) 496 status, err := s.status(path) 497 if err != nil { 498 return status, errors.Wrap(err, "failed reading status of resume write") 499 } 500 if ref != status.Ref { 501 // NOTE(stevvooe): This is fairly catastrophic. Either we have some 502 // layout corruption or a hash collision for the ref key. 503 return status, errors.Errorf("ref key does not match: %v != %v", ref, status.Ref) 504 } 505 506 if total > 0 && status.Total > 0 && total != status.Total { 507 return status, errors.Errorf("provided total differs from status: %v != %v", total, status.Total) 508 } 509 510 // TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes 511 fp, err := os.Open(data) 512 if err != nil { 513 return status, err 514 } 515 516 p := bufPool.Get().(*[]byte) 517 status.Offset, err = io.CopyBuffer(digester.Hash(), fp, *p) 518 bufPool.Put(p) 519 fp.Close() 520 return status, err 521} 522 523// writer provides the main implementation of the Writer method. The caller 524// must hold the lock correctly and release on error if there is a problem. 525func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) { 526 // TODO(stevvooe): Need to actually store expected here. We have 527 // code in the service that shouldn't be dealing with this. 528 if expected != "" { 529 p, err := s.blobPath(expected) 530 if err != nil { 531 return nil, errors.Wrap(err, "calculating expected blob path for writer") 532 } 533 if _, err := os.Stat(p); err == nil { 534 return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) 535 } 536 } 537 538 path, refp, data := s.ingestPaths(ref) 539 540 var ( 541 digester = digest.Canonical.Digester() 542 offset int64 543 startedAt time.Time 544 updatedAt time.Time 545 ) 546 547 foundValidIngest := false 548 // ensure that the ingest path has been created. 549 if err := os.Mkdir(path, 0755); err != nil { 550 if !os.IsExist(err) { 551 return nil, err 552 } 553 status, err := s.resumeStatus(ref, total, digester) 554 if err == nil { 555 foundValidIngest = true 556 updatedAt = status.UpdatedAt 557 startedAt = status.StartedAt 558 total = status.Total 559 offset = status.Offset 560 } else { 561 logrus.Infof("failed to resume the status from path %s: %s. will recreate them", path, err.Error()) 562 } 563 } 564 565 if !foundValidIngest { 566 startedAt = time.Now() 567 updatedAt = startedAt 568 569 // the ingest is new, we need to setup the target location. 570 // write the ref to a file for later use 571 if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil { 572 return nil, err 573 } 574 575 if err := writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil { 576 return nil, err 577 } 578 579 if err := writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil { 580 return nil, err 581 } 582 583 if total > 0 { 584 if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil { 585 return nil, err 586 } 587 } 588 } 589 590 fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666) 591 if err != nil { 592 return nil, errors.Wrap(err, "failed to open data file") 593 } 594 595 if _, err := fp.Seek(offset, io.SeekStart); err != nil { 596 return nil, errors.Wrap(err, "could not seek to current write offset") 597 } 598 599 return &writer{ 600 s: s, 601 fp: fp, 602 ref: ref, 603 path: path, 604 offset: offset, 605 total: total, 606 digester: digester, 607 startedAt: startedAt, 608 updatedAt: updatedAt, 609 }, nil 610} 611 612// Abort an active transaction keyed by ref. If the ingest is active, it will 613// be cancelled. Any resources associated with the ingest will be cleaned. 614func (s *store) Abort(ctx context.Context, ref string) error { 615 root := s.ingestRoot(ref) 616 if err := os.RemoveAll(root); err != nil { 617 if os.IsNotExist(err) { 618 return errors.Wrapf(errdefs.ErrNotFound, "ingest ref %q", ref) 619 } 620 621 return err 622 } 623 624 return nil 625} 626 627func (s *store) blobPath(dgst digest.Digest) (string, error) { 628 if err := dgst.Validate(); err != nil { 629 return "", errors.Wrapf(errdefs.ErrInvalidArgument, "cannot calculate blob path from invalid digest: %v", err) 630 } 631 632 return filepath.Join(s.root, "blobs", dgst.Algorithm().String(), dgst.Hex()), nil 633} 634 635func (s *store) ingestRoot(ref string) string { 636 // we take a digest of the ref to keep the ingest paths constant length. 637 // Note that this is not the current or potential digest of incoming content. 638 dgst := digest.FromString(ref) 639 return filepath.Join(s.root, "ingest", dgst.Hex()) 640} 641 642// ingestPaths are returned. The paths are the following: 643// 644// - root: entire ingest directory 645// - ref: name of the starting ref, must be unique 646// - data: file where data is written 647// 648func (s *store) ingestPaths(ref string) (string, string, string) { 649 var ( 650 fp = s.ingestRoot(ref) 651 rp = filepath.Join(fp, "ref") 652 dp = filepath.Join(fp, "data") 653 ) 654 655 return fp, rp, dp 656} 657 658func readFileString(path string) (string, error) { 659 p, err := ioutil.ReadFile(path) 660 return string(p), err 661} 662 663// readFileTimestamp reads a file with just a timestamp present. 664func readFileTimestamp(p string) (time.Time, error) { 665 b, err := ioutil.ReadFile(p) 666 if err != nil { 667 if os.IsNotExist(err) { 668 err = errors.Wrap(errdefs.ErrNotFound, err.Error()) 669 } 670 return time.Time{}, err 671 } 672 673 var t time.Time 674 if err := t.UnmarshalText(b); err != nil { 675 return time.Time{}, errors.Wrapf(err, "could not parse timestamp file %v", p) 676 } 677 678 return t, nil 679} 680 681func writeTimestampFile(p string, t time.Time) error { 682 b, err := t.MarshalText() 683 if err != nil { 684 return err 685 } 686 return atomicWrite(p, b, 0666) 687} 688 689func atomicWrite(path string, data []byte, mode os.FileMode) error { 690 tmp := fmt.Sprintf("%s.tmp", path) 691 f, err := os.OpenFile(tmp, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, mode) 692 if err != nil { 693 return errors.Wrap(err, "create tmp file") 694 } 695 _, err = f.Write(data) 696 f.Close() 697 if err != nil { 698 return errors.Wrap(err, "write atomic data") 699 } 700 return os.Rename(tmp, path) 701} 702