1package local 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "io/ioutil" 8 "math/rand" 9 "os" 10 "path/filepath" 11 "strconv" 12 "strings" 13 "sync" 14 "time" 15 16 "github.com/containerd/containerd/content" 17 "github.com/containerd/containerd/errdefs" 18 "github.com/containerd/containerd/filters" 19 "github.com/containerd/containerd/log" 20 digest "github.com/opencontainers/go-digest" 21 "github.com/pkg/errors" 22) 23 24var bufPool = sync.Pool{ 25 New: func() interface{} { 26 buffer := make([]byte, 1<<20) 27 return &buffer 28 }, 29} 30 31// LabelStore is used to store mutable labels for digests 32type LabelStore interface { 33 // Get returns all the labels for the given digest 34 Get(digest.Digest) (map[string]string, error) 35 36 // Set sets all the labels for a given digest 37 Set(digest.Digest, map[string]string) error 38 39 // Update replaces the given labels for a digest, 40 // a key with an empty value removes a label. 41 Update(digest.Digest, map[string]string) (map[string]string, error) 42} 43 44// Store is digest-keyed store for content. All data written into the store is 45// stored under a verifiable digest. 46// 47// Store can generally support multi-reader, single-writer ingest of data, 48// including resumable ingest. 49type store struct { 50 root string 51 ls LabelStore 52} 53 54// NewStore returns a local content store 55func NewStore(root string) (content.Store, error) { 56 return NewLabeledStore(root, nil) 57} 58 59// NewLabeledStore returns a new content store using the provided label store 60// 61// Note: content stores which are used underneath a metadata store may not 62// require labels and should use `NewStore`. `NewLabeledStore` is primarily 63// useful for tests or standalone implementations. 64func NewLabeledStore(root string, ls LabelStore) (content.Store, error) { 65 if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil { 66 return nil, err 67 } 68 69 return &store{ 70 root: root, 71 ls: ls, 72 }, nil 73} 74 75func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { 76 p := s.blobPath(dgst) 77 fi, err := os.Stat(p) 78 if err != nil { 79 if os.IsNotExist(err) { 80 err = errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst) 81 } 82 83 return content.Info{}, err 84 } 85 var labels map[string]string 86 if s.ls != nil { 87 labels, err = s.ls.Get(dgst) 88 if err != nil { 89 return content.Info{}, err 90 } 91 } 92 return s.info(dgst, fi, labels), nil 93} 94 95func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]string) content.Info { 96 return content.Info{ 97 Digest: dgst, 98 Size: fi.Size(), 99 CreatedAt: fi.ModTime(), 100 UpdatedAt: getATime(fi), 101 Labels: labels, 102 } 103} 104 105// ReaderAt returns an io.ReaderAt for the blob. 106func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { 107 p := s.blobPath(dgst) 108 fi, err := os.Stat(p) 109 if err != nil { 110 if !os.IsNotExist(err) { 111 return nil, err 112 } 113 114 return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) 115 } 116 117 fp, err := os.Open(p) 118 if err != nil { 119 if !os.IsNotExist(err) { 120 return nil, err 121 } 122 123 return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) 124 } 125 126 return sizeReaderAt{size: fi.Size(), fp: fp}, nil 127} 128 129// Delete removes a blob by its digest. 130// 131// While this is safe to do concurrently, safe exist-removal logic must hold 132// some global lock on the store. 133func (s *store) Delete(ctx context.Context, dgst digest.Digest) error { 134 if err := os.RemoveAll(s.blobPath(dgst)); err != nil { 135 if !os.IsNotExist(err) { 136 return err 137 } 138 139 return errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst) 140 } 141 142 return nil 143} 144 145func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { 146 if s.ls == nil { 147 return content.Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store") 148 } 149 150 p := s.blobPath(info.Digest) 151 fi, err := os.Stat(p) 152 if err != nil { 153 if os.IsNotExist(err) { 154 err = errors.Wrapf(errdefs.ErrNotFound, "content %v", info.Digest) 155 } 156 157 return content.Info{}, err 158 } 159 160 var ( 161 all bool 162 labels map[string]string 163 ) 164 if len(fieldpaths) > 0 { 165 for _, path := range fieldpaths { 166 if strings.HasPrefix(path, "labels.") { 167 if labels == nil { 168 labels = map[string]string{} 169 } 170 171 key := strings.TrimPrefix(path, "labels.") 172 labels[key] = info.Labels[key] 173 continue 174 } 175 176 switch path { 177 case "labels": 178 all = true 179 labels = info.Labels 180 default: 181 return content.Info{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest) 182 } 183 } 184 } else { 185 all = true 186 labels = info.Labels 187 } 188 189 if all { 190 err = s.ls.Set(info.Digest, labels) 191 } else { 192 labels, err = s.ls.Update(info.Digest, labels) 193 } 194 if err != nil { 195 return content.Info{}, err 196 } 197 198 info = s.info(info.Digest, fi, labels) 199 info.UpdatedAt = time.Now() 200 201 if err := os.Chtimes(p, info.UpdatedAt, info.CreatedAt); err != nil { 202 log.G(ctx).WithError(err).Warnf("could not change access time for %s", info.Digest) 203 } 204 205 return info, nil 206} 207 208func (s *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error { 209 // TODO: Support filters 210 root := filepath.Join(s.root, "blobs") 211 var alg digest.Algorithm 212 return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { 213 if err != nil { 214 return err 215 } 216 if !fi.IsDir() && !alg.Available() { 217 return nil 218 } 219 220 // TODO(stevvooe): There are few more cases with subdirs that should be 221 // handled in case the layout gets corrupted. This isn't strict enough 222 // and may spew bad data. 223 224 if path == root { 225 return nil 226 } 227 if filepath.Dir(path) == root { 228 alg = digest.Algorithm(filepath.Base(path)) 229 230 if !alg.Available() { 231 alg = "" 232 return filepath.SkipDir 233 } 234 235 // descending into a hash directory 236 return nil 237 } 238 239 dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path)) 240 if err := dgst.Validate(); err != nil { 241 // log error but don't report 242 log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path") 243 // if we see this, it could mean some sort of corruption of the 244 // store or extra paths not expected previously. 245 } 246 247 var labels map[string]string 248 if s.ls != nil { 249 labels, err = s.ls.Get(dgst) 250 if err != nil { 251 return err 252 } 253 } 254 return fn(s.info(dgst, fi, labels)) 255 }) 256} 257 258func (s *store) Status(ctx context.Context, ref string) (content.Status, error) { 259 return s.status(s.ingestRoot(ref)) 260} 261 262func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) { 263 fp, err := os.Open(filepath.Join(s.root, "ingest")) 264 if err != nil { 265 return nil, err 266 } 267 268 defer fp.Close() 269 270 fis, err := fp.Readdir(-1) 271 if err != nil { 272 return nil, err 273 } 274 275 filter, err := filters.ParseAll(fs...) 276 if err != nil { 277 return nil, err 278 } 279 280 var active []content.Status 281 for _, fi := range fis { 282 p := filepath.Join(s.root, "ingest", fi.Name()) 283 stat, err := s.status(p) 284 if err != nil { 285 if !os.IsNotExist(err) { 286 return nil, err 287 } 288 289 // TODO(stevvooe): This is a common error if uploads are being 290 // completed while making this listing. Need to consider taking a 291 // lock on the whole store to coordinate this aspect. 292 // 293 // Another option is to cleanup downloads asynchronously and 294 // coordinate this method with the cleanup process. 295 // 296 // For now, we just skip them, as they really don't exist. 297 continue 298 } 299 300 if filter.Match(adaptStatus(stat)) { 301 active = append(active, stat) 302 } 303 } 304 305 return active, nil 306} 307 308// status works like stat above except uses the path to the ingest. 309func (s *store) status(ingestPath string) (content.Status, error) { 310 dp := filepath.Join(ingestPath, "data") 311 fi, err := os.Stat(dp) 312 if err != nil { 313 if os.IsNotExist(err) { 314 err = errors.Wrap(errdefs.ErrNotFound, err.Error()) 315 } 316 return content.Status{}, err 317 } 318 319 ref, err := readFileString(filepath.Join(ingestPath, "ref")) 320 if err != nil { 321 if os.IsNotExist(err) { 322 err = errors.Wrap(errdefs.ErrNotFound, err.Error()) 323 } 324 return content.Status{}, err 325 } 326 327 startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat")) 328 if err != nil { 329 return content.Status{}, errors.Wrapf(err, "could not read startedat") 330 } 331 332 updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat")) 333 if err != nil { 334 return content.Status{}, errors.Wrapf(err, "could not read updatedat") 335 } 336 337 // because we don't write updatedat on every write, the mod time may 338 // actually be more up to date. 339 if fi.ModTime().After(updatedAt) { 340 updatedAt = fi.ModTime() 341 } 342 343 return content.Status{ 344 Ref: ref, 345 Offset: fi.Size(), 346 Total: s.total(ingestPath), 347 UpdatedAt: updatedAt, 348 StartedAt: startedAt, 349 }, nil 350} 351 352func adaptStatus(status content.Status) filters.Adaptor { 353 return filters.AdapterFunc(func(fieldpath []string) (string, bool) { 354 if len(fieldpath) == 0 { 355 return "", false 356 } 357 switch fieldpath[0] { 358 case "ref": 359 return status.Ref, true 360 } 361 362 return "", false 363 }) 364} 365 366// total attempts to resolve the total expected size for the write. 367func (s *store) total(ingestPath string) int64 { 368 totalS, err := readFileString(filepath.Join(ingestPath, "total")) 369 if err != nil { 370 return 0 371 } 372 373 total, err := strconv.ParseInt(totalS, 10, 64) 374 if err != nil { 375 // represents a corrupted file, should probably remove. 376 return 0 377 } 378 379 return total 380} 381 382// Writer begins or resumes the active writer identified by ref. If the writer 383// is already in use, an error is returned. Only one writer may be in use per 384// ref at a time. 385// 386// The argument `ref` is used to uniquely identify a long-lived writer transaction. 387func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) { 388 var lockErr error 389 for count := uint64(0); count < 10; count++ { 390 time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count))) 391 if err := tryLock(ref); err != nil { 392 if !errdefs.IsUnavailable(err) { 393 return nil, err 394 } 395 396 lockErr = err 397 } else { 398 lockErr = nil 399 break 400 } 401 } 402 403 if lockErr != nil { 404 return nil, lockErr 405 } 406 407 w, err := s.writer(ctx, ref, total, expected) 408 if err != nil { 409 unlock(ref) 410 return nil, err 411 } 412 413 return w, nil // lock is now held by w. 414} 415 416// writer provides the main implementation of the Writer method. The caller 417// must hold the lock correctly and release on error if there is a problem. 418func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) { 419 // TODO(stevvooe): Need to actually store expected here. We have 420 // code in the service that shouldn't be dealing with this. 421 if expected != "" { 422 p := s.blobPath(expected) 423 if _, err := os.Stat(p); err == nil { 424 return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) 425 } 426 } 427 428 path, refp, data := s.ingestPaths(ref) 429 430 var ( 431 digester = digest.Canonical.Digester() 432 offset int64 433 startedAt time.Time 434 updatedAt time.Time 435 ) 436 437 // ensure that the ingest path has been created. 438 if err := os.Mkdir(path, 0755); err != nil { 439 if !os.IsExist(err) { 440 return nil, err 441 } 442 443 status, err := s.status(path) 444 if err != nil { 445 return nil, errors.Wrap(err, "failed reading status of resume write") 446 } 447 448 if ref != status.Ref { 449 // NOTE(stevvooe): This is fairly catastrophic. Either we have some 450 // layout corruption or a hash collision for the ref key. 451 return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref) 452 } 453 454 if total > 0 && status.Total > 0 && total != status.Total { 455 return nil, errors.Errorf("provided total differs from status: %v != %v", total, status.Total) 456 } 457 458 // TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes 459 fp, err := os.Open(data) 460 if err != nil { 461 return nil, err 462 } 463 defer fp.Close() 464 465 p := bufPool.Get().(*[]byte) 466 defer bufPool.Put(p) 467 468 offset, err = io.CopyBuffer(digester.Hash(), fp, *p) 469 if err != nil { 470 return nil, err 471 } 472 473 updatedAt = status.UpdatedAt 474 startedAt = status.StartedAt 475 total = status.Total 476 } else { 477 startedAt = time.Now() 478 updatedAt = startedAt 479 480 // the ingest is new, we need to setup the target location. 481 // write the ref to a file for later use 482 if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil { 483 return nil, err 484 } 485 486 if writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil { 487 return nil, err 488 } 489 490 if writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil { 491 return nil, err 492 } 493 494 if total > 0 { 495 if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil { 496 return nil, err 497 } 498 } 499 } 500 501 fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666) 502 if err != nil { 503 return nil, errors.Wrap(err, "failed to open data file") 504 } 505 506 if _, err := fp.Seek(offset, io.SeekStart); err != nil { 507 return nil, errors.Wrap(err, "could not seek to current write offset") 508 } 509 510 return &writer{ 511 s: s, 512 fp: fp, 513 ref: ref, 514 path: path, 515 offset: offset, 516 total: total, 517 digester: digester, 518 startedAt: startedAt, 519 updatedAt: updatedAt, 520 }, nil 521} 522 523// Abort an active transaction keyed by ref. If the ingest is active, it will 524// be cancelled. Any resources associated with the ingest will be cleaned. 525func (s *store) Abort(ctx context.Context, ref string) error { 526 root := s.ingestRoot(ref) 527 if err := os.RemoveAll(root); err != nil { 528 if os.IsNotExist(err) { 529 return errors.Wrapf(errdefs.ErrNotFound, "ingest ref %q", ref) 530 } 531 532 return err 533 } 534 535 return nil 536} 537 538func (s *store) blobPath(dgst digest.Digest) string { 539 return filepath.Join(s.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) 540} 541 542func (s *store) ingestRoot(ref string) string { 543 dgst := digest.FromString(ref) 544 return filepath.Join(s.root, "ingest", dgst.Hex()) 545} 546 547// ingestPaths are returned. The paths are the following: 548// 549// - root: entire ingest directory 550// - ref: name of the starting ref, must be unique 551// - data: file where data is written 552// 553func (s *store) ingestPaths(ref string) (string, string, string) { 554 var ( 555 fp = s.ingestRoot(ref) 556 rp = filepath.Join(fp, "ref") 557 dp = filepath.Join(fp, "data") 558 ) 559 560 return fp, rp, dp 561} 562 563func readFileString(path string) (string, error) { 564 p, err := ioutil.ReadFile(path) 565 return string(p), err 566} 567 568// readFileTimestamp reads a file with just a timestamp present. 569func readFileTimestamp(p string) (time.Time, error) { 570 b, err := ioutil.ReadFile(p) 571 if err != nil { 572 if os.IsNotExist(err) { 573 err = errors.Wrap(errdefs.ErrNotFound, err.Error()) 574 } 575 return time.Time{}, err 576 } 577 578 var t time.Time 579 if err := t.UnmarshalText(b); err != nil { 580 return time.Time{}, errors.Wrapf(err, "could not parse timestamp file %v", p) 581 } 582 583 return t, nil 584} 585 586func writeTimestampFile(p string, t time.Time) error { 587 b, err := t.MarshalText() 588 if err != nil { 589 return err 590 } 591 592 return ioutil.WriteFile(p, b, 0666) 593} 594