1/* 2 Copyright The containerd Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15*/ 16 17package local 18 19import ( 20 "context" 21 "fmt" 22 "io" 23 "io/ioutil" 24 "math/rand" 25 "os" 26 "path/filepath" 27 "strconv" 28 "strings" 29 "sync" 30 "time" 31 32 "github.com/containerd/containerd/content" 33 "github.com/containerd/containerd/errdefs" 34 "github.com/containerd/containerd/filters" 35 "github.com/containerd/containerd/log" 36 37 "github.com/containerd/continuity" 38 digest "github.com/opencontainers/go-digest" 39 ocispec "github.com/opencontainers/image-spec/specs-go/v1" 40 "github.com/pkg/errors" 41) 42 43var bufPool = sync.Pool{ 44 New: func() interface{} { 45 buffer := make([]byte, 1<<20) 46 return &buffer 47 }, 48} 49 50// LabelStore is used to store mutable labels for digests 51type LabelStore interface { 52 // Get returns all the labels for the given digest 53 Get(digest.Digest) (map[string]string, error) 54 55 // Set sets all the labels for a given digest 56 Set(digest.Digest, map[string]string) error 57 58 // Update replaces the given labels for a digest, 59 // a key with an empty value removes a label. 60 Update(digest.Digest, map[string]string) (map[string]string, error) 61} 62 63// Store is digest-keyed store for content. All data written into the store is 64// stored under a verifiable digest. 65// 66// Store can generally support multi-reader, single-writer ingest of data, 67// including resumable ingest. 68type store struct { 69 root string 70 ls LabelStore 71} 72 73// NewStore returns a local content store 74func NewStore(root string) (content.Store, error) { 75 return NewLabeledStore(root, nil) 76} 77 78// NewLabeledStore returns a new content store using the provided label store 79// 80// Note: content stores which are used underneath a metadata store may not 81// require labels and should use `NewStore`. `NewLabeledStore` is primarily 82// useful for tests or standalone implementations. 83func NewLabeledStore(root string, ls LabelStore) (content.Store, error) { 84 if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil { 85 return nil, err 86 } 87 88 return &store{ 89 root: root, 90 ls: ls, 91 }, nil 92} 93 94func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { 95 p := s.blobPath(dgst) 96 fi, err := os.Stat(p) 97 if err != nil { 98 if os.IsNotExist(err) { 99 err = errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst) 100 } 101 102 return content.Info{}, err 103 } 104 var labels map[string]string 105 if s.ls != nil { 106 labels, err = s.ls.Get(dgst) 107 if err != nil { 108 return content.Info{}, err 109 } 110 } 111 return s.info(dgst, fi, labels), nil 112} 113 114func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]string) content.Info { 115 return content.Info{ 116 Digest: dgst, 117 Size: fi.Size(), 118 CreatedAt: fi.ModTime(), 119 UpdatedAt: getATime(fi), 120 Labels: labels, 121 } 122} 123 124// ReaderAt returns an io.ReaderAt for the blob. 125func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { 126 p := s.blobPath(desc.Digest) 127 fi, err := os.Stat(p) 128 if err != nil { 129 if !os.IsNotExist(err) { 130 return nil, err 131 } 132 133 return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p) 134 } 135 136 fp, err := os.Open(p) 137 if err != nil { 138 if !os.IsNotExist(err) { 139 return nil, err 140 } 141 142 return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p) 143 } 144 145 return sizeReaderAt{size: fi.Size(), fp: fp}, nil 146} 147 148// Delete removes a blob by its digest. 149// 150// While this is safe to do concurrently, safe exist-removal logic must hold 151// some global lock on the store. 152func (s *store) Delete(ctx context.Context, dgst digest.Digest) error { 153 if err := os.RemoveAll(s.blobPath(dgst)); err != nil { 154 if !os.IsNotExist(err) { 155 return err 156 } 157 158 return errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst) 159 } 160 161 return nil 162} 163 164func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { 165 if s.ls == nil { 166 return content.Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store") 167 } 168 169 p := s.blobPath(info.Digest) 170 fi, err := os.Stat(p) 171 if err != nil { 172 if os.IsNotExist(err) { 173 err = errors.Wrapf(errdefs.ErrNotFound, "content %v", info.Digest) 174 } 175 176 return content.Info{}, err 177 } 178 179 var ( 180 all bool 181 labels map[string]string 182 ) 183 if len(fieldpaths) > 0 { 184 for _, path := range fieldpaths { 185 if strings.HasPrefix(path, "labels.") { 186 if labels == nil { 187 labels = map[string]string{} 188 } 189 190 key := strings.TrimPrefix(path, "labels.") 191 labels[key] = info.Labels[key] 192 continue 193 } 194 195 switch path { 196 case "labels": 197 all = true 198 labels = info.Labels 199 default: 200 return content.Info{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest) 201 } 202 } 203 } else { 204 all = true 205 labels = info.Labels 206 } 207 208 if all { 209 err = s.ls.Set(info.Digest, labels) 210 } else { 211 labels, err = s.ls.Update(info.Digest, labels) 212 } 213 if err != nil { 214 return content.Info{}, err 215 } 216 217 info = s.info(info.Digest, fi, labels) 218 info.UpdatedAt = time.Now() 219 220 if err := os.Chtimes(p, info.UpdatedAt, info.CreatedAt); err != nil { 221 log.G(ctx).WithError(err).Warnf("could not change access time for %s", info.Digest) 222 } 223 224 return info, nil 225} 226 227func (s *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error { 228 // TODO: Support filters 229 root := filepath.Join(s.root, "blobs") 230 var alg digest.Algorithm 231 return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { 232 if err != nil { 233 return err 234 } 235 if !fi.IsDir() && !alg.Available() { 236 return nil 237 } 238 239 // TODO(stevvooe): There are few more cases with subdirs that should be 240 // handled in case the layout gets corrupted. This isn't strict enough 241 // and may spew bad data. 242 243 if path == root { 244 return nil 245 } 246 if filepath.Dir(path) == root { 247 alg = digest.Algorithm(filepath.Base(path)) 248 249 if !alg.Available() { 250 alg = "" 251 return filepath.SkipDir 252 } 253 254 // descending into a hash directory 255 return nil 256 } 257 258 dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path)) 259 if err := dgst.Validate(); err != nil { 260 // log error but don't report 261 log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path") 262 // if we see this, it could mean some sort of corruption of the 263 // store or extra paths not expected previously. 264 } 265 266 var labels map[string]string 267 if s.ls != nil { 268 labels, err = s.ls.Get(dgst) 269 if err != nil { 270 return err 271 } 272 } 273 return fn(s.info(dgst, fi, labels)) 274 }) 275} 276 277func (s *store) Status(ctx context.Context, ref string) (content.Status, error) { 278 return s.status(s.ingestRoot(ref)) 279} 280 281func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) { 282 fp, err := os.Open(filepath.Join(s.root, "ingest")) 283 if err != nil { 284 return nil, err 285 } 286 287 defer fp.Close() 288 289 fis, err := fp.Readdir(-1) 290 if err != nil { 291 return nil, err 292 } 293 294 filter, err := filters.ParseAll(fs...) 295 if err != nil { 296 return nil, err 297 } 298 299 var active []content.Status 300 for _, fi := range fis { 301 p := filepath.Join(s.root, "ingest", fi.Name()) 302 stat, err := s.status(p) 303 if err != nil { 304 if !os.IsNotExist(err) { 305 return nil, err 306 } 307 308 // TODO(stevvooe): This is a common error if uploads are being 309 // completed while making this listing. Need to consider taking a 310 // lock on the whole store to coordinate this aspect. 311 // 312 // Another option is to cleanup downloads asynchronously and 313 // coordinate this method with the cleanup process. 314 // 315 // For now, we just skip them, as they really don't exist. 316 continue 317 } 318 319 if filter.Match(adaptStatus(stat)) { 320 active = append(active, stat) 321 } 322 } 323 324 return active, nil 325} 326 327// WalkStatusRefs is used to walk all status references 328// Failed status reads will be logged and ignored, if 329// this function is called while references are being altered, 330// these error messages may be produced. 331func (s *store) WalkStatusRefs(ctx context.Context, fn func(string) error) error { 332 fp, err := os.Open(filepath.Join(s.root, "ingest")) 333 if err != nil { 334 return err 335 } 336 337 defer fp.Close() 338 339 fis, err := fp.Readdir(-1) 340 if err != nil { 341 return err 342 } 343 344 for _, fi := range fis { 345 rf := filepath.Join(s.root, "ingest", fi.Name(), "ref") 346 347 ref, err := readFileString(rf) 348 if err != nil { 349 log.G(ctx).WithError(err).WithField("path", rf).Error("failed to read ingest ref") 350 continue 351 } 352 353 if err := fn(ref); err != nil { 354 return err 355 } 356 } 357 358 return nil 359} 360 361// status works like stat above except uses the path to the ingest. 362func (s *store) status(ingestPath string) (content.Status, error) { 363 dp := filepath.Join(ingestPath, "data") 364 fi, err := os.Stat(dp) 365 if err != nil { 366 if os.IsNotExist(err) { 367 err = errors.Wrap(errdefs.ErrNotFound, err.Error()) 368 } 369 return content.Status{}, err 370 } 371 372 ref, err := readFileString(filepath.Join(ingestPath, "ref")) 373 if err != nil { 374 if os.IsNotExist(err) { 375 err = errors.Wrap(errdefs.ErrNotFound, err.Error()) 376 } 377 return content.Status{}, err 378 } 379 380 startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat")) 381 if err != nil { 382 return content.Status{}, errors.Wrapf(err, "could not read startedat") 383 } 384 385 updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat")) 386 if err != nil { 387 return content.Status{}, errors.Wrapf(err, "could not read updatedat") 388 } 389 390 // because we don't write updatedat on every write, the mod time may 391 // actually be more up to date. 392 if fi.ModTime().After(updatedAt) { 393 updatedAt = fi.ModTime() 394 } 395 396 return content.Status{ 397 Ref: ref, 398 Offset: fi.Size(), 399 Total: s.total(ingestPath), 400 UpdatedAt: updatedAt, 401 StartedAt: startedAt, 402 }, nil 403} 404 405func adaptStatus(status content.Status) filters.Adaptor { 406 return filters.AdapterFunc(func(fieldpath []string) (string, bool) { 407 if len(fieldpath) == 0 { 408 return "", false 409 } 410 switch fieldpath[0] { 411 case "ref": 412 return status.Ref, true 413 } 414 415 return "", false 416 }) 417} 418 419// total attempts to resolve the total expected size for the write. 420func (s *store) total(ingestPath string) int64 { 421 totalS, err := readFileString(filepath.Join(ingestPath, "total")) 422 if err != nil { 423 return 0 424 } 425 426 total, err := strconv.ParseInt(totalS, 10, 64) 427 if err != nil { 428 // represents a corrupted file, should probably remove. 429 return 0 430 } 431 432 return total 433} 434 435// Writer begins or resumes the active writer identified by ref. If the writer 436// is already in use, an error is returned. Only one writer may be in use per 437// ref at a time. 438// 439// The argument `ref` is used to uniquely identify a long-lived writer transaction. 440func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { 441 var wOpts content.WriterOpts 442 for _, opt := range opts { 443 if err := opt(&wOpts); err != nil { 444 return nil, err 445 } 446 } 447 // TODO(AkihiroSuda): we could create a random string or one calculated based on the context 448 // https://github.com/containerd/containerd/issues/2129#issuecomment-380255019 449 if wOpts.Ref == "" { 450 return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty") 451 } 452 var lockErr error 453 for count := uint64(0); count < 10; count++ { 454 time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count))) 455 if err := tryLock(wOpts.Ref); err != nil { 456 if !errdefs.IsUnavailable(err) { 457 return nil, err 458 } 459 460 lockErr = err 461 } else { 462 lockErr = nil 463 break 464 } 465 } 466 467 if lockErr != nil { 468 return nil, lockErr 469 } 470 471 w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest) 472 if err != nil { 473 unlock(wOpts.Ref) 474 return nil, err 475 } 476 477 return w, nil // lock is now held by w. 478} 479 480// writer provides the main implementation of the Writer method. The caller 481// must hold the lock correctly and release on error if there is a problem. 482func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) { 483 // TODO(stevvooe): Need to actually store expected here. We have 484 // code in the service that shouldn't be dealing with this. 485 if expected != "" { 486 p := s.blobPath(expected) 487 if _, err := os.Stat(p); err == nil { 488 return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) 489 } 490 } 491 492 path, refp, data := s.ingestPaths(ref) 493 494 var ( 495 digester = digest.Canonical.Digester() 496 offset int64 497 startedAt time.Time 498 updatedAt time.Time 499 ) 500 501 // ensure that the ingest path has been created. 502 if err := os.Mkdir(path, 0755); err != nil { 503 if !os.IsExist(err) { 504 return nil, err 505 } 506 507 status, err := s.status(path) 508 if err != nil { 509 return nil, errors.Wrap(err, "failed reading status of resume write") 510 } 511 512 if ref != status.Ref { 513 // NOTE(stevvooe): This is fairly catastrophic. Either we have some 514 // layout corruption or a hash collision for the ref key. 515 return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref) 516 } 517 518 if total > 0 && status.Total > 0 && total != status.Total { 519 return nil, errors.Errorf("provided total differs from status: %v != %v", total, status.Total) 520 } 521 522 // TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes 523 fp, err := os.Open(data) 524 if err != nil { 525 return nil, err 526 } 527 528 p := bufPool.Get().(*[]byte) 529 offset, err = io.CopyBuffer(digester.Hash(), fp, *p) 530 bufPool.Put(p) 531 fp.Close() 532 if err != nil { 533 return nil, err 534 } 535 536 updatedAt = status.UpdatedAt 537 startedAt = status.StartedAt 538 total = status.Total 539 } else { 540 startedAt = time.Now() 541 updatedAt = startedAt 542 543 // the ingest is new, we need to setup the target location. 544 // write the ref to a file for later use 545 if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil { 546 return nil, err 547 } 548 549 if writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil { 550 return nil, err 551 } 552 553 if writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil { 554 return nil, err 555 } 556 557 if total > 0 { 558 if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil { 559 return nil, err 560 } 561 } 562 } 563 564 fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666) 565 if err != nil { 566 return nil, errors.Wrap(err, "failed to open data file") 567 } 568 569 if _, err := fp.Seek(offset, io.SeekStart); err != nil { 570 return nil, errors.Wrap(err, "could not seek to current write offset") 571 } 572 573 return &writer{ 574 s: s, 575 fp: fp, 576 ref: ref, 577 path: path, 578 offset: offset, 579 total: total, 580 digester: digester, 581 startedAt: startedAt, 582 updatedAt: updatedAt, 583 }, nil 584} 585 586// Abort an active transaction keyed by ref. If the ingest is active, it will 587// be cancelled. Any resources associated with the ingest will be cleaned. 588func (s *store) Abort(ctx context.Context, ref string) error { 589 root := s.ingestRoot(ref) 590 if err := os.RemoveAll(root); err != nil { 591 if os.IsNotExist(err) { 592 return errors.Wrapf(errdefs.ErrNotFound, "ingest ref %q", ref) 593 } 594 595 return err 596 } 597 598 return nil 599} 600 601func (s *store) blobPath(dgst digest.Digest) string { 602 return filepath.Join(s.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) 603} 604 605func (s *store) ingestRoot(ref string) string { 606 dgst := digest.FromString(ref) 607 return filepath.Join(s.root, "ingest", dgst.Hex()) 608} 609 610// ingestPaths are returned. The paths are the following: 611// 612// - root: entire ingest directory 613// - ref: name of the starting ref, must be unique 614// - data: file where data is written 615// 616func (s *store) ingestPaths(ref string) (string, string, string) { 617 var ( 618 fp = s.ingestRoot(ref) 619 rp = filepath.Join(fp, "ref") 620 dp = filepath.Join(fp, "data") 621 ) 622 623 return fp, rp, dp 624} 625 626func readFileString(path string) (string, error) { 627 p, err := ioutil.ReadFile(path) 628 return string(p), err 629} 630 631// readFileTimestamp reads a file with just a timestamp present. 632func readFileTimestamp(p string) (time.Time, error) { 633 b, err := ioutil.ReadFile(p) 634 if err != nil { 635 if os.IsNotExist(err) { 636 err = errors.Wrap(errdefs.ErrNotFound, err.Error()) 637 } 638 return time.Time{}, err 639 } 640 641 var t time.Time 642 if err := t.UnmarshalText(b); err != nil { 643 return time.Time{}, errors.Wrapf(err, "could not parse timestamp file %v", p) 644 } 645 646 return t, nil 647} 648 649func writeTimestampFile(p string, t time.Time) error { 650 b, err := t.MarshalText() 651 if err != nil { 652 return err 653 } 654 655 return continuity.AtomicWriteFile(p, b, 0666) 656} 657