1package containerimage 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "io/ioutil" 9 "runtime" 10 "sync" 11 "time" 12 13 "github.com/containerd/containerd/content" 14 "github.com/containerd/containerd/errdefs" 15 "github.com/containerd/containerd/images" 16 "github.com/containerd/containerd/platforms" 17 ctdreference "github.com/containerd/containerd/reference" 18 "github.com/containerd/containerd/remotes" 19 "github.com/containerd/containerd/remotes/docker" 20 "github.com/containerd/containerd/remotes/docker/schema1" 21 distreference "github.com/docker/distribution/reference" 22 "github.com/docker/docker/distribution" 23 "github.com/docker/docker/distribution/metadata" 24 "github.com/docker/docker/distribution/xfer" 25 "github.com/docker/docker/image" 26 "github.com/docker/docker/layer" 27 pkgprogress "github.com/docker/docker/pkg/progress" 28 "github.com/docker/docker/reference" 29 "github.com/moby/buildkit/cache" 30 "github.com/moby/buildkit/session" 31 "github.com/moby/buildkit/session/auth" 32 "github.com/moby/buildkit/source" 33 "github.com/moby/buildkit/util/flightcontrol" 34 "github.com/moby/buildkit/util/imageutil" 35 "github.com/moby/buildkit/util/progress" 36 "github.com/moby/buildkit/util/tracing" 37 digest "github.com/opencontainers/go-digest" 38 "github.com/opencontainers/image-spec/identity" 39 ocispec "github.com/opencontainers/image-spec/specs-go/v1" 40 "github.com/pkg/errors" 41 "golang.org/x/time/rate" 42) 43 44const preferLocal = true // FIXME: make this optional from the op 45 46// SourceOpt is options for creating the image source 47type SourceOpt struct { 48 SessionManager *session.Manager 49 ContentStore content.Store 50 CacheAccessor cache.Accessor 51 ReferenceStore reference.Store 52 DownloadManager distribution.RootFSDownloadManager 53 MetadataStore metadata.V2MetadataService 54 ImageStore image.Store 55} 56 57type imageSource struct { 58 SourceOpt 59 g flightcontrol.Group 60} 61 62// NewSource creates a new image source 63func NewSource(opt SourceOpt) (source.Source, error) { 64 is := &imageSource{ 65 SourceOpt: opt, 66 } 67 68 return is, nil 69} 70 71func (is *imageSource) ID() string { 72 return source.DockerImageScheme 73} 74 75func (is *imageSource) getResolver(ctx context.Context) remotes.Resolver { 76 return docker.NewResolver(docker.ResolverOptions{ 77 Client: tracing.DefaultClient, 78 Credentials: is.getCredentialsFromSession(ctx), 79 }) 80} 81 82func (is *imageSource) getCredentialsFromSession(ctx context.Context) func(string) (string, string, error) { 83 id := session.FromContext(ctx) 84 if id == "" { 85 return nil 86 } 87 return func(host string) (string, string, error) { 88 timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 89 defer cancel() 90 91 caller, err := is.SessionManager.Get(timeoutCtx, id) 92 if err != nil { 93 return "", "", err 94 } 95 96 return auth.CredentialsFunc(tracing.ContextWithSpanFromContext(context.TODO(), ctx), caller)(host) 97 } 98} 99 100func (is *imageSource) resolveLocal(refStr string) ([]byte, error) { 101 ref, err := distreference.ParseNormalizedNamed(refStr) 102 if err != nil { 103 return nil, err 104 } 105 dgst, err := is.ReferenceStore.Get(ref) 106 if err != nil { 107 return nil, err 108 } 109 img, err := is.ImageStore.Get(image.ID(dgst)) 110 if err != nil { 111 return nil, err 112 } 113 return img.RawJSON(), nil 114} 115 116func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) { 117 if preferLocal { 118 dt, err := is.resolveLocal(ref) 119 if err == nil { 120 return "", dt, nil 121 } 122 } 123 124 type t struct { 125 dgst digest.Digest 126 dt []byte 127 } 128 res, err := is.g.Do(ctx, ref, func(ctx context.Context) (interface{}, error) { 129 dgst, dt, err := imageutil.Config(ctx, ref, is.getResolver(ctx), is.ContentStore, "") 130 if err != nil { 131 return nil, err 132 } 133 return &t{dgst: dgst, dt: dt}, nil 134 }) 135 if err != nil { 136 return "", nil, err 137 } 138 typed := res.(*t) 139 return typed.dgst, typed.dt, nil 140} 141 142func (is *imageSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) { 143 imageIdentifier, ok := id.(*source.ImageIdentifier) 144 if !ok { 145 return nil, errors.Errorf("invalid image identifier %v", id) 146 } 147 148 p := &puller{ 149 src: imageIdentifier, 150 is: is, 151 resolver: is.getResolver(ctx), 152 } 153 return p, nil 154} 155 156type puller struct { 157 is *imageSource 158 resolveOnce sync.Once 159 resolveLocalOnce sync.Once 160 src *source.ImageIdentifier 161 desc ocispec.Descriptor 162 ref string 163 resolveErr error 164 resolver remotes.Resolver 165 config []byte 166} 167 168func (p *puller) mainManifestKey(dgst digest.Digest) (digest.Digest, error) { 169 dt, err := json.Marshal(struct { 170 Digest digest.Digest 171 OS string 172 Arch string 173 }{ 174 Digest: p.desc.Digest, 175 OS: runtime.GOOS, 176 Arch: runtime.GOARCH, 177 }) 178 if err != nil { 179 return "", err 180 } 181 return digest.FromBytes(dt), nil 182} 183 184func (p *puller) resolveLocal() { 185 p.resolveLocalOnce.Do(func() { 186 dgst := p.src.Reference.Digest() 187 if dgst != "" { 188 info, err := p.is.ContentStore.Info(context.TODO(), dgst) 189 if err == nil { 190 p.ref = p.src.Reference.String() 191 desc := ocispec.Descriptor{ 192 Size: info.Size, 193 Digest: dgst, 194 } 195 ra, err := p.is.ContentStore.ReaderAt(context.TODO(), desc) 196 if err == nil { 197 mt, err := imageutil.DetectManifestMediaType(ra) 198 if err == nil { 199 desc.MediaType = mt 200 p.desc = desc 201 } 202 } 203 } 204 } 205 206 if preferLocal { 207 dt, err := p.is.resolveLocal(p.src.Reference.String()) 208 if err == nil { 209 p.config = dt 210 } 211 } 212 }) 213} 214 215func (p *puller) resolve(ctx context.Context) error { 216 p.resolveOnce.Do(func() { 217 resolveProgressDone := oneOffProgress(ctx, "resolve "+p.src.Reference.String()) 218 219 ref, err := distreference.ParseNormalizedNamed(p.src.Reference.String()) 220 if err != nil { 221 p.resolveErr = err 222 resolveProgressDone(err) 223 return 224 } 225 226 if p.desc.Digest == "" && p.config == nil { 227 origRef, desc, err := p.resolver.Resolve(ctx, ref.String()) 228 if err != nil { 229 p.resolveErr = err 230 resolveProgressDone(err) 231 return 232 } 233 234 p.desc = desc 235 p.ref = origRef 236 } 237 238 // Schema 1 manifests cannot be resolved to an image config 239 // since the conversion must take place after all the content 240 // has been read. 241 // It may be possible to have a mapping between schema 1 manifests 242 // and the schema 2 manifests they are converted to. 243 if p.config == nil && p.desc.MediaType != images.MediaTypeDockerSchema1Manifest { 244 ref, err := distreference.WithDigest(ref, p.desc.Digest) 245 if err != nil { 246 p.resolveErr = err 247 resolveProgressDone(err) 248 return 249 } 250 251 _, dt, err := p.is.ResolveImageConfig(ctx, ref.String()) 252 if err != nil { 253 p.resolveErr = err 254 resolveProgressDone(err) 255 return 256 } 257 258 p.config = dt 259 } 260 resolveProgressDone(nil) 261 }) 262 return p.resolveErr 263} 264 265func (p *puller) CacheKey(ctx context.Context, index int) (string, bool, error) { 266 p.resolveLocal() 267 268 if p.desc.Digest != "" && index == 0 { 269 dgst, err := p.mainManifestKey(p.desc.Digest) 270 if err != nil { 271 return "", false, err 272 } 273 return dgst.String(), false, nil 274 } 275 276 if p.config != nil { 277 return cacheKeyFromConfig(p.config).String(), true, nil 278 } 279 280 if err := p.resolve(ctx); err != nil { 281 return "", false, err 282 } 283 284 if p.desc.Digest != "" && index == 0 { 285 dgst, err := p.mainManifestKey(p.desc.Digest) 286 if err != nil { 287 return "", false, err 288 } 289 return dgst.String(), false, nil 290 } 291 292 return cacheKeyFromConfig(p.config).String(), true, nil 293} 294 295func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { 296 p.resolveLocal() 297 if err := p.resolve(ctx); err != nil { 298 return nil, err 299 } 300 301 if p.config != nil { 302 img, err := p.is.ImageStore.Get(image.ID(digest.FromBytes(p.config))) 303 if err == nil { 304 if len(img.RootFS.DiffIDs) == 0 { 305 return nil, nil 306 } 307 ref, err := p.is.CacheAccessor.GetFromSnapshotter(ctx, string(img.RootFS.ChainID()), cache.WithDescription(fmt.Sprintf("from local %s", p.ref))) 308 if err != nil { 309 return nil, err 310 } 311 return ref, nil 312 } 313 } 314 315 ongoing := newJobs(p.ref) 316 317 pctx, stopProgress := context.WithCancel(ctx) 318 319 pw, _, ctx := progress.FromContext(ctx) 320 defer pw.Close() 321 322 progressDone := make(chan struct{}) 323 go func() { 324 showProgress(pctx, ongoing, p.is.ContentStore, pw) 325 close(progressDone) 326 }() 327 defer func() { 328 <-progressDone 329 }() 330 331 fetcher, err := p.resolver.Fetcher(ctx, p.ref) 332 if err != nil { 333 stopProgress() 334 return nil, err 335 } 336 337 var ( 338 schema1Converter *schema1.Converter 339 handlers []images.Handler 340 ) 341 if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest { 342 schema1Converter = schema1.NewConverter(p.is.ContentStore, fetcher) 343 handlers = append(handlers, schema1Converter) 344 345 // TODO: Optimize to do dispatch and integrate pulling with download manager, 346 // leverage existing blob mapping and layer storage 347 } else { 348 349 // TODO: need a wrapper snapshot interface that combines content 350 // and snapshots as 1) buildkit shouldn't have a dependency on contentstore 351 // or 2) cachemanager should manage the contentstore 352 handlers = append(handlers, images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { 353 switch desc.MediaType { 354 case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest, 355 images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex, 356 images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig: 357 default: 358 return nil, images.ErrSkipDesc 359 } 360 ongoing.add(desc) 361 return nil, nil 362 })) 363 364 // Get all the children for a descriptor 365 childrenHandler := images.ChildrenHandler(p.is.ContentStore) 366 // Set any children labels for that content 367 childrenHandler = images.SetChildrenLabels(p.is.ContentStore, childrenHandler) 368 // Filter the childen by the platform 369 childrenHandler = images.FilterPlatforms(childrenHandler, platforms.Default()) 370 371 handlers = append(handlers, 372 remotes.FetchHandler(p.is.ContentStore, fetcher), 373 childrenHandler, 374 ) 375 } 376 377 if err := images.Dispatch(ctx, images.Handlers(handlers...), p.desc); err != nil { 378 stopProgress() 379 return nil, err 380 } 381 defer stopProgress() 382 383 if schema1Converter != nil { 384 p.desc, err = schema1Converter.Convert(ctx) 385 if err != nil { 386 return nil, err 387 } 388 } 389 390 mfst, err := images.Manifest(ctx, p.is.ContentStore, p.desc, platforms.Default()) 391 if err != nil { 392 return nil, err 393 } 394 395 config, err := images.Config(ctx, p.is.ContentStore, p.desc, platforms.Default()) 396 if err != nil { 397 return nil, err 398 } 399 400 dt, err := content.ReadBlob(ctx, p.is.ContentStore, config) 401 if err != nil { 402 return nil, err 403 } 404 405 var img ocispec.Image 406 if err := json.Unmarshal(dt, &img); err != nil { 407 return nil, err 408 } 409 410 if len(mfst.Layers) != len(img.RootFS.DiffIDs) { 411 return nil, errors.Errorf("invalid config for manifest") 412 } 413 414 pchan := make(chan pkgprogress.Progress, 10) 415 defer close(pchan) 416 417 go func() { 418 m := map[string]struct { 419 st time.Time 420 limiter *rate.Limiter 421 }{} 422 for p := range pchan { 423 if p.Action == "Extracting" { 424 st, ok := m[p.ID] 425 if !ok { 426 st.st = time.Now() 427 st.limiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1) 428 m[p.ID] = st 429 } 430 var end *time.Time 431 if p.LastUpdate || st.limiter.Allow() { 432 if p.LastUpdate { 433 tm := time.Now() 434 end = &tm 435 } 436 pw.Write("extracting "+p.ID, progress.Status{ 437 Action: "extract", 438 Started: &st.st, 439 Completed: end, 440 }) 441 } 442 } 443 } 444 }() 445 446 if len(mfst.Layers) == 0 { 447 return nil, nil 448 } 449 450 layers := make([]xfer.DownloadDescriptor, 0, len(mfst.Layers)) 451 452 for i, desc := range mfst.Layers { 453 ongoing.add(desc) 454 layers = append(layers, &layerDescriptor{ 455 desc: desc, 456 diffID: layer.DiffID(img.RootFS.DiffIDs[i]), 457 fetcher: fetcher, 458 ref: p.src.Reference, 459 is: p.is, 460 }) 461 } 462 463 defer func() { 464 <-progressDone 465 for _, desc := range mfst.Layers { 466 p.is.ContentStore.Delete(context.TODO(), desc.Digest) 467 } 468 }() 469 470 r := image.NewRootFS() 471 rootFS, release, err := p.is.DownloadManager.Download(ctx, *r, runtime.GOOS, layers, pkgprogress.ChanOutput(pchan)) 472 if err != nil { 473 return nil, err 474 } 475 stopProgress() 476 477 ref, err := p.is.CacheAccessor.GetFromSnapshotter(ctx, string(rootFS.ChainID()), cache.WithDescription(fmt.Sprintf("pulled from %s", p.ref))) 478 release() 479 if err != nil { 480 return nil, err 481 } 482 483 return ref, nil 484} 485 486// Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) 487type layerDescriptor struct { 488 is *imageSource 489 fetcher remotes.Fetcher 490 desc ocispec.Descriptor 491 diffID layer.DiffID 492 ref ctdreference.Spec 493} 494 495func (ld *layerDescriptor) Key() string { 496 return "v2:" + ld.desc.Digest.String() 497} 498 499func (ld *layerDescriptor) ID() string { 500 return ld.desc.Digest.String() 501} 502 503func (ld *layerDescriptor) DiffID() (layer.DiffID, error) { 504 return ld.diffID, nil 505} 506 507func (ld *layerDescriptor) Download(ctx context.Context, progressOutput pkgprogress.Output) (io.ReadCloser, int64, error) { 508 rc, err := ld.fetcher.Fetch(ctx, ld.desc) 509 if err != nil { 510 return nil, 0, err 511 } 512 defer rc.Close() 513 514 refKey := remotes.MakeRefKey(ctx, ld.desc) 515 516 ld.is.ContentStore.Abort(ctx, refKey) 517 518 if err := content.WriteBlob(ctx, ld.is.ContentStore, refKey, rc, ld.desc); err != nil { 519 ld.is.ContentStore.Abort(ctx, refKey) 520 return nil, 0, err 521 } 522 523 ra, err := ld.is.ContentStore.ReaderAt(ctx, ld.desc) 524 if err != nil { 525 return nil, 0, err 526 } 527 528 return ioutil.NopCloser(content.NewReader(ra)), ld.desc.Size, nil 529} 530 531func (ld *layerDescriptor) Close() { 532 // ld.is.ContentStore.Delete(context.TODO(), ld.desc.Digest)) 533} 534 535func (ld *layerDescriptor) Registered(diffID layer.DiffID) { 536 // Cache mapping from this layer's DiffID to the blobsum 537 ld.is.MetadataStore.Add(diffID, metadata.V2Metadata{Digest: ld.desc.Digest, SourceRepository: ld.ref.Locator}) 538} 539 540func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, pw progress.Writer) { 541 var ( 542 ticker = time.NewTicker(100 * time.Millisecond) 543 statuses = map[string]statusInfo{} 544 done bool 545 ) 546 defer ticker.Stop() 547 548 for { 549 select { 550 case <-ticker.C: 551 case <-ctx.Done(): 552 done = true 553 } 554 555 resolved := "resolved" 556 if !ongoing.isResolved() { 557 resolved = "resolving" 558 } 559 statuses[ongoing.name] = statusInfo{ 560 Ref: ongoing.name, 561 Status: resolved, 562 } 563 564 actives := make(map[string]statusInfo) 565 566 if !done { 567 active, err := cs.ListStatuses(ctx) 568 if err != nil { 569 // log.G(ctx).WithError(err).Error("active check failed") 570 continue 571 } 572 // update status of active entries! 573 for _, active := range active { 574 actives[active.Ref] = statusInfo{ 575 Ref: active.Ref, 576 Status: "downloading", 577 Offset: active.Offset, 578 Total: active.Total, 579 StartedAt: active.StartedAt, 580 UpdatedAt: active.UpdatedAt, 581 } 582 } 583 } 584 585 // now, update the items in jobs that are not in active 586 for _, j := range ongoing.jobs() { 587 refKey := remotes.MakeRefKey(ctx, j.Descriptor) 588 if a, ok := actives[refKey]; ok { 589 started := j.started 590 pw.Write(j.Digest.String(), progress.Status{ 591 Action: a.Status, 592 Total: int(a.Total), 593 Current: int(a.Offset), 594 Started: &started, 595 }) 596 continue 597 } 598 599 if !j.done { 600 info, err := cs.Info(context.TODO(), j.Digest) 601 if err != nil { 602 if errdefs.IsNotFound(err) { 603 // pw.Write(j.Digest.String(), progress.Status{ 604 // Action: "waiting", 605 // }) 606 continue 607 } 608 } else { 609 j.done = true 610 } 611 612 if done || j.done { 613 started := j.started 614 createdAt := info.CreatedAt 615 pw.Write(j.Digest.String(), progress.Status{ 616 Action: "done", 617 Current: int(info.Size), 618 Total: int(info.Size), 619 Completed: &createdAt, 620 Started: &started, 621 }) 622 } 623 } 624 } 625 if done { 626 return 627 } 628 } 629} 630 631// jobs provides a way of identifying the download keys for a particular task 632// encountering during the pull walk. 633// 634// This is very minimal and will probably be replaced with something more 635// featured. 636type jobs struct { 637 name string 638 added map[digest.Digest]job 639 mu sync.Mutex 640 resolved bool 641} 642 643type job struct { 644 ocispec.Descriptor 645 done bool 646 started time.Time 647} 648 649func newJobs(name string) *jobs { 650 return &jobs{ 651 name: name, 652 added: make(map[digest.Digest]job), 653 } 654} 655 656func (j *jobs) add(desc ocispec.Descriptor) { 657 j.mu.Lock() 658 defer j.mu.Unlock() 659 660 if _, ok := j.added[desc.Digest]; ok { 661 return 662 } 663 j.added[desc.Digest] = job{ 664 Descriptor: desc, 665 started: time.Now(), 666 } 667} 668 669func (j *jobs) jobs() []job { 670 j.mu.Lock() 671 defer j.mu.Unlock() 672 673 descs := make([]job, 0, len(j.added)) 674 for _, j := range j.added { 675 descs = append(descs, j) 676 } 677 return descs 678} 679 680func (j *jobs) isResolved() bool { 681 j.mu.Lock() 682 defer j.mu.Unlock() 683 return j.resolved 684} 685 686type statusInfo struct { 687 Ref string 688 Status string 689 Offset int64 690 Total int64 691 StartedAt time.Time 692 UpdatedAt time.Time 693} 694 695func oneOffProgress(ctx context.Context, id string) func(err error) error { 696 pw, _, _ := progress.FromContext(ctx) 697 now := time.Now() 698 st := progress.Status{ 699 Started: &now, 700 } 701 pw.Write(id, st) 702 return func(err error) error { 703 // TODO: set error on status 704 now := time.Now() 705 st.Completed = &now 706 pw.Write(id, st) 707 pw.Close() 708 return err 709 } 710} 711 712// cacheKeyFromConfig returns a stable digest from image config. If image config 713// is a known oci image we will use chainID of layers. 714func cacheKeyFromConfig(dt []byte) digest.Digest { 715 var img ocispec.Image 716 err := json.Unmarshal(dt, &img) 717 if err != nil { 718 return digest.FromBytes(dt) 719 } 720 if img.RootFS.Type != "layers" { 721 return digest.FromBytes(dt) 722 } 723 return identity.ChainID(img.RootFS.DiffIDs) 724} 725