1package containerimage
2
3import (
4	"context"
5	"encoding/json"
6	"fmt"
7	"io"
8	"io/ioutil"
9	"path"
10	"runtime"
11	"sync"
12	"time"
13
14	"github.com/containerd/containerd/content"
15	containerderrors "github.com/containerd/containerd/errdefs"
16	"github.com/containerd/containerd/gc"
17	"github.com/containerd/containerd/images"
18	"github.com/containerd/containerd/leases"
19	"github.com/containerd/containerd/platforms"
20	ctdreference "github.com/containerd/containerd/reference"
21	"github.com/containerd/containerd/remotes"
22	"github.com/containerd/containerd/remotes/docker"
23	"github.com/containerd/containerd/remotes/docker/schema1"
24	distreference "github.com/docker/distribution/reference"
25	"github.com/docker/docker/distribution"
26	"github.com/docker/docker/distribution/metadata"
27	"github.com/docker/docker/distribution/xfer"
28	"github.com/docker/docker/image"
29	"github.com/docker/docker/layer"
30	pkgprogress "github.com/docker/docker/pkg/progress"
31	"github.com/docker/docker/reference"
32	"github.com/moby/buildkit/cache"
33	"github.com/moby/buildkit/client/llb"
34	"github.com/moby/buildkit/session"
35	"github.com/moby/buildkit/solver"
36	"github.com/moby/buildkit/source"
37	"github.com/moby/buildkit/util/flightcontrol"
38	"github.com/moby/buildkit/util/imageutil"
39	"github.com/moby/buildkit/util/leaseutil"
40	"github.com/moby/buildkit/util/progress"
41	"github.com/moby/buildkit/util/resolver"
42	digest "github.com/opencontainers/go-digest"
43	"github.com/opencontainers/image-spec/identity"
44	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
45	"github.com/pkg/errors"
46	"github.com/sirupsen/logrus"
47	"golang.org/x/time/rate"
48)
49
50// SourceOpt is options for creating the image source
51type SourceOpt struct {
52	ContentStore    content.Store
53	CacheAccessor   cache.Accessor
54	ReferenceStore  reference.Store
55	DownloadManager distribution.RootFSDownloadManager
56	MetadataStore   metadata.V2MetadataService
57	ImageStore      image.Store
58	RegistryHosts   docker.RegistryHosts
59	LayerStore      layer.Store
60	LeaseManager    leases.Manager
61	GarbageCollect  func(ctx context.Context) (gc.Stats, error)
62}
63
64// Source is the source implementation for accessing container images
65type Source struct {
66	SourceOpt
67	g flightcontrol.Group
68}
69
70// NewSource creates a new image source
71func NewSource(opt SourceOpt) (*Source, error) {
72	return &Source{SourceOpt: opt}, nil
73}
74
75// ID returns image scheme identifier
76func (is *Source) ID() string {
77	return source.DockerImageScheme
78}
79
80func (is *Source) resolveLocal(refStr string) (*image.Image, error) {
81	ref, err := distreference.ParseNormalizedNamed(refStr)
82	if err != nil {
83		return nil, err
84	}
85	dgst, err := is.ReferenceStore.Get(ref)
86	if err != nil {
87		return nil, err
88	}
89	img, err := is.ImageStore.Get(image.ID(dgst))
90	if err != nil {
91		return nil, err
92	}
93	return img, nil
94}
95
96func (is *Source) resolveRemote(ctx context.Context, ref string, platform *ocispec.Platform, sm *session.Manager, g session.Group) (digest.Digest, []byte, error) {
97	type t struct {
98		dgst digest.Digest
99		dt   []byte
100	}
101	p := platforms.DefaultSpec()
102	if platform != nil {
103		p = *platform
104	}
105	// key is used to synchronize resolutions that can happen in parallel when doing multi-stage.
106	key := "getconfig::" + ref + "::" + platforms.Format(p)
107	res, err := is.g.Do(ctx, key, func(ctx context.Context) (interface{}, error) {
108		res := resolver.DefaultPool.GetResolver(is.RegistryHosts, ref, "pull", sm, g)
109		dgst, dt, err := imageutil.Config(ctx, ref, res, is.ContentStore, is.LeaseManager, platform)
110		if err != nil {
111			return nil, err
112		}
113		return &t{dgst: dgst, dt: dt}, nil
114	})
115	var typed *t
116	if err != nil {
117		return "", nil, err
118	}
119	typed = res.(*t)
120	return typed.dgst, typed.dt, nil
121}
122
123// ResolveImageConfig returns image config for an image
124func (is *Source) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (digest.Digest, []byte, error) {
125	resolveMode, err := source.ParseImageResolveMode(opt.ResolveMode)
126	if err != nil {
127		return "", nil, err
128	}
129	switch resolveMode {
130	case source.ResolveModeForcePull:
131		dgst, dt, err := is.resolveRemote(ctx, ref, opt.Platform, sm, g)
132		// TODO: pull should fallback to local in case of failure to allow offline behavior
133		// the fallback doesn't work currently
134		return dgst, dt, err
135		/*
136			if err == nil {
137				return dgst, dt, err
138			}
139			// fallback to local
140			dt, err = is.resolveLocal(ref)
141			return "", dt, err
142		*/
143
144	case source.ResolveModeDefault:
145		// default == prefer local, but in the future could be smarter
146		fallthrough
147	case source.ResolveModePreferLocal:
148		img, err := is.resolveLocal(ref)
149		if err == nil {
150			if opt.Platform != nil && !platformMatches(img, opt.Platform) {
151				logrus.WithField("ref", ref).Debugf("Requested build platform %s does not match local image platform %s, checking remote",
152					path.Join(opt.Platform.OS, opt.Platform.Architecture, opt.Platform.Variant),
153					path.Join(img.OS, img.Architecture, img.Variant),
154				)
155			} else {
156				return "", img.RawJSON(), err
157			}
158		}
159		// fallback to remote
160		return is.resolveRemote(ctx, ref, opt.Platform, sm, g)
161	}
162	// should never happen
163	return "", nil, fmt.Errorf("builder cannot resolve image %s: invalid mode %q", ref, opt.ResolveMode)
164}
165
166// Resolve returns access to pulling for an identifier
167func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager, vtx solver.Vertex) (source.SourceInstance, error) {
168	imageIdentifier, ok := id.(*source.ImageIdentifier)
169	if !ok {
170		return nil, errors.Errorf("invalid image identifier %v", id)
171	}
172
173	platform := platforms.DefaultSpec()
174	if imageIdentifier.Platform != nil {
175		platform = *imageIdentifier.Platform
176	}
177
178	p := &puller{
179		src: imageIdentifier,
180		is:  is,
181		//resolver: is.getResolver(is.RegistryHosts, imageIdentifier.Reference.String(), sm, g),
182		platform: platform,
183		sm:       sm,
184	}
185	return p, nil
186}
187
188type puller struct {
189	is               *Source
190	resolveLocalOnce sync.Once
191	g                flightcontrol.Group
192	src              *source.ImageIdentifier
193	desc             ocispec.Descriptor
194	ref              string
195	config           []byte
196	platform         ocispec.Platform
197	sm               *session.Manager
198}
199
200func (p *puller) resolver(g session.Group) remotes.Resolver {
201	return resolver.DefaultPool.GetResolver(p.is.RegistryHosts, p.src.Reference.String(), "pull", p.sm, g)
202}
203
204func (p *puller) mainManifestKey(platform ocispec.Platform) (digest.Digest, error) {
205	dt, err := json.Marshal(struct {
206		Digest  digest.Digest
207		OS      string
208		Arch    string
209		Variant string `json:",omitempty"`
210	}{
211		Digest:  p.desc.Digest,
212		OS:      platform.OS,
213		Arch:    platform.Architecture,
214		Variant: platform.Variant,
215	})
216	if err != nil {
217		return "", err
218	}
219	return digest.FromBytes(dt), nil
220}
221
222func (p *puller) resolveLocal() {
223	p.resolveLocalOnce.Do(func() {
224		dgst := p.src.Reference.Digest()
225		if dgst != "" {
226			info, err := p.is.ContentStore.Info(context.TODO(), dgst)
227			if err == nil {
228				p.ref = p.src.Reference.String()
229				desc := ocispec.Descriptor{
230					Size:   info.Size,
231					Digest: dgst,
232				}
233				ra, err := p.is.ContentStore.ReaderAt(context.TODO(), desc)
234				if err == nil {
235					mt, err := imageutil.DetectManifestMediaType(ra)
236					if err == nil {
237						desc.MediaType = mt
238						p.desc = desc
239					}
240				}
241			}
242		}
243
244		if p.src.ResolveMode == source.ResolveModeDefault || p.src.ResolveMode == source.ResolveModePreferLocal {
245			ref := p.src.Reference.String()
246			img, err := p.is.resolveLocal(ref)
247			if err == nil {
248				if !platformMatches(img, &p.platform) {
249					logrus.WithField("ref", ref).Debugf("Requested build platform %s does not match local image platform %s, not resolving",
250						path.Join(p.platform.OS, p.platform.Architecture, p.platform.Variant),
251						path.Join(img.OS, img.Architecture, img.Variant),
252					)
253				} else {
254					p.config = img.RawJSON()
255				}
256			}
257		}
258	})
259}
260
261func (p *puller) resolve(ctx context.Context, g session.Group) error {
262	_, err := p.g.Do(ctx, "", func(ctx context.Context) (_ interface{}, err error) {
263		resolveProgressDone := oneOffProgress(ctx, "resolve "+p.src.Reference.String())
264		defer func() {
265			resolveProgressDone(err)
266		}()
267
268		ref, err := distreference.ParseNormalizedNamed(p.src.Reference.String())
269		if err != nil {
270			return nil, err
271		}
272
273		if p.desc.Digest == "" && p.config == nil {
274			origRef, desc, err := p.resolver(g).Resolve(ctx, ref.String())
275			if err != nil {
276				return nil, err
277			}
278
279			p.desc = desc
280			p.ref = origRef
281		}
282
283		// Schema 1 manifests cannot be resolved to an image config
284		// since the conversion must take place after all the content
285		// has been read.
286		// It may be possible to have a mapping between schema 1 manifests
287		// and the schema 2 manifests they are converted to.
288		if p.config == nil && p.desc.MediaType != images.MediaTypeDockerSchema1Manifest {
289			ref, err := distreference.WithDigest(ref, p.desc.Digest)
290			if err != nil {
291				return nil, err
292			}
293			_, dt, err := p.is.ResolveImageConfig(ctx, ref.String(), llb.ResolveImageConfigOpt{Platform: &p.platform, ResolveMode: resolveModeToString(p.src.ResolveMode)}, p.sm, g)
294			if err != nil {
295				return nil, err
296			}
297
298			p.config = dt
299		}
300		return nil, nil
301	})
302	return err
303}
304
305func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (string, solver.CacheOpts, bool, error) {
306	p.resolveLocal()
307
308	if p.desc.Digest != "" && index == 0 {
309		dgst, err := p.mainManifestKey(p.platform)
310		if err != nil {
311			return "", nil, false, err
312		}
313		return dgst.String(), nil, false, nil
314	}
315
316	if p.config != nil {
317		k := cacheKeyFromConfig(p.config).String()
318		if k == "" {
319			return digest.FromBytes(p.config).String(), nil, true, nil
320		}
321		return k, nil, true, nil
322	}
323
324	if err := p.resolve(ctx, g); err != nil {
325		return "", nil, false, err
326	}
327
328	if p.desc.Digest != "" && index == 0 {
329		dgst, err := p.mainManifestKey(p.platform)
330		if err != nil {
331			return "", nil, false, err
332		}
333		return dgst.String(), nil, false, nil
334	}
335
336	if len(p.config) == 0 && p.desc.MediaType != images.MediaTypeDockerSchema1Manifest {
337		return "", nil, false, errors.Errorf("invalid empty config file resolved for %s", p.src.Reference.String())
338	}
339
340	k := cacheKeyFromConfig(p.config).String()
341	if k == "" || p.desc.MediaType == images.MediaTypeDockerSchema1Manifest {
342		dgst, err := p.mainManifestKey(p.platform)
343		if err != nil {
344			return "", nil, false, err
345		}
346		return dgst.String(), nil, true, nil
347	}
348
349	return k, nil, true, nil
350}
351
352func (p *puller) getRef(ctx context.Context, diffIDs []layer.DiffID, opts ...cache.RefOption) (cache.ImmutableRef, error) {
353	var parent cache.ImmutableRef
354	if len(diffIDs) > 1 {
355		var err error
356		parent, err = p.getRef(ctx, diffIDs[:len(diffIDs)-1], opts...)
357		if err != nil {
358			return nil, err
359		}
360		defer parent.Release(context.TODO())
361	}
362	return p.is.CacheAccessor.GetByBlob(ctx, ocispec.Descriptor{
363		Annotations: map[string]string{
364			"containerd.io/uncompressed": diffIDs[len(diffIDs)-1].String(),
365		},
366	}, parent, opts...)
367}
368
369func (p *puller) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
370	p.resolveLocal()
371	if len(p.config) == 0 {
372		if err := p.resolve(ctx, g); err != nil {
373			return nil, err
374		}
375	}
376
377	if p.config != nil {
378		img, err := p.is.ImageStore.Get(image.ID(digest.FromBytes(p.config)))
379		if err == nil {
380			if len(img.RootFS.DiffIDs) == 0 {
381				return nil, nil
382			}
383			l, err := p.is.LayerStore.Get(img.RootFS.ChainID())
384			if err == nil {
385				layer.ReleaseAndLog(p.is.LayerStore, l)
386				ref, err := p.getRef(ctx, img.RootFS.DiffIDs, cache.WithDescription(fmt.Sprintf("from local %s", p.ref)))
387				if err != nil {
388					return nil, err
389				}
390				return ref, nil
391			}
392		}
393	}
394
395	ongoing := newJobs(p.ref)
396
397	ctx, done, err := leaseutil.WithLease(ctx, p.is.LeaseManager, leases.WithExpiration(5*time.Minute), leaseutil.MakeTemporary)
398	if err != nil {
399		return nil, err
400	}
401	defer func() {
402		done(context.TODO())
403		if p.is.GarbageCollect != nil {
404			go p.is.GarbageCollect(context.TODO())
405		}
406	}()
407
408	pctx, stopProgress := context.WithCancel(ctx)
409
410	pw, _, ctx := progress.FromContext(ctx)
411	defer pw.Close()
412
413	progressDone := make(chan struct{})
414	go func() {
415		showProgress(pctx, ongoing, p.is.ContentStore, pw)
416		close(progressDone)
417	}()
418	defer func() {
419		<-progressDone
420	}()
421
422	fetcher, err := p.resolver(g).Fetcher(ctx, p.ref)
423	if err != nil {
424		stopProgress()
425		return nil, err
426	}
427
428	platform := platforms.Only(p.platform)
429
430	var nonLayers []digest.Digest
431
432	var (
433		schema1Converter *schema1.Converter
434		handlers         []images.Handler
435	)
436	if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest {
437		schema1Converter = schema1.NewConverter(p.is.ContentStore, fetcher)
438		handlers = append(handlers, schema1Converter)
439
440		// TODO: Optimize to do dispatch and integrate pulling with download manager,
441		// leverage existing blob mapping and layer storage
442	} else {
443
444		// TODO: need a wrapper snapshot interface that combines content
445		// and snapshots as 1) buildkit shouldn't have a dependency on contentstore
446		// or 2) cachemanager should manage the contentstore
447		handlers = append(handlers, images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
448			switch desc.MediaType {
449			case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
450				images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex,
451				images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
452				nonLayers = append(nonLayers, desc.Digest)
453			default:
454				return nil, images.ErrSkipDesc
455			}
456			ongoing.add(desc)
457			return nil, nil
458		}))
459
460		// Get all the children for a descriptor
461		childrenHandler := images.ChildrenHandler(p.is.ContentStore)
462		// Filter the children by the platform
463		childrenHandler = images.FilterPlatforms(childrenHandler, platform)
464		// Limit manifests pulled to the best match in an index
465		childrenHandler = images.LimitManifests(childrenHandler, platform, 1)
466
467		handlers = append(handlers,
468			remotes.FetchHandler(p.is.ContentStore, fetcher),
469			childrenHandler,
470		)
471	}
472
473	if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, p.desc); err != nil {
474		stopProgress()
475		return nil, err
476	}
477	defer stopProgress()
478
479	if schema1Converter != nil {
480		p.desc, err = schema1Converter.Convert(ctx)
481		if err != nil {
482			return nil, err
483		}
484	}
485
486	mfst, err := images.Manifest(ctx, p.is.ContentStore, p.desc, platform)
487	if err != nil {
488		return nil, err
489	}
490
491	config, err := images.Config(ctx, p.is.ContentStore, p.desc, platform)
492	if err != nil {
493		return nil, err
494	}
495
496	dt, err := content.ReadBlob(ctx, p.is.ContentStore, config)
497	if err != nil {
498		return nil, err
499	}
500
501	var img ocispec.Image
502	if err := json.Unmarshal(dt, &img); err != nil {
503		return nil, err
504	}
505
506	if len(mfst.Layers) != len(img.RootFS.DiffIDs) {
507		return nil, errors.Errorf("invalid config for manifest")
508	}
509
510	pchan := make(chan pkgprogress.Progress, 10)
511	defer close(pchan)
512
513	go func() {
514		m := map[string]struct {
515			st      time.Time
516			limiter *rate.Limiter
517		}{}
518		for p := range pchan {
519			if p.Action == "Extracting" {
520				st, ok := m[p.ID]
521				if !ok {
522					st.st = time.Now()
523					st.limiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
524					m[p.ID] = st
525				}
526				var end *time.Time
527				if p.LastUpdate || st.limiter.Allow() {
528					if p.LastUpdate {
529						tm := time.Now()
530						end = &tm
531					}
532					_ = pw.Write("extracting "+p.ID, progress.Status{
533						Action:    "extract",
534						Started:   &st.st,
535						Completed: end,
536					})
537				}
538			}
539		}
540	}()
541
542	if len(mfst.Layers) == 0 {
543		return nil, nil
544	}
545
546	layers := make([]xfer.DownloadDescriptor, 0, len(mfst.Layers))
547
548	for i, desc := range mfst.Layers {
549		if err := desc.Digest.Validate(); err != nil {
550			return nil, errors.Wrap(err, "layer digest could not be validated")
551		}
552		ongoing.add(desc)
553		layers = append(layers, &layerDescriptor{
554			desc:    desc,
555			diffID:  layer.DiffID(img.RootFS.DiffIDs[i]),
556			fetcher: fetcher,
557			ref:     p.src.Reference,
558			is:      p.is,
559		})
560	}
561
562	defer func() {
563		<-progressDone
564	}()
565
566	r := image.NewRootFS()
567	rootFS, release, err := p.is.DownloadManager.Download(ctx, *r, runtime.GOOS, layers, pkgprogress.ChanOutput(pchan))
568	stopProgress()
569	if err != nil {
570		return nil, err
571	}
572
573	ref, err := p.getRef(ctx, rootFS.DiffIDs, cache.WithDescription(fmt.Sprintf("pulled from %s", p.ref)))
574	release()
575	if err != nil {
576		return nil, err
577	}
578
579	// keep manifest blobs until ref is alive for cache
580	for _, nl := range nonLayers {
581		if err := p.is.LeaseManager.AddResource(ctx, leases.Lease{ID: ref.ID()}, leases.Resource{
582			ID:   nl.String(),
583			Type: "content",
584		}); err != nil {
585			return nil, err
586		}
587	}
588
589	// TODO: handle windows layers for cross platform builds
590
591	if p.src.RecordType != "" && cache.GetRecordType(ref) == "" {
592		if err := cache.SetRecordType(ref, p.src.RecordType); err != nil {
593			ref.Release(context.TODO())
594			return nil, err
595		}
596	}
597
598	return ref, nil
599}
600
601// Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
602type layerDescriptor struct {
603	is      *Source
604	fetcher remotes.Fetcher
605	desc    ocispec.Descriptor
606	diffID  layer.DiffID
607	ref     ctdreference.Spec
608}
609
610func (ld *layerDescriptor) Key() string {
611	return "v2:" + ld.desc.Digest.String()
612}
613
614func (ld *layerDescriptor) ID() string {
615	return ld.desc.Digest.String()
616}
617
618func (ld *layerDescriptor) DiffID() (layer.DiffID, error) {
619	return ld.diffID, nil
620}
621
622func (ld *layerDescriptor) Download(ctx context.Context, progressOutput pkgprogress.Output) (io.ReadCloser, int64, error) {
623	rc, err := ld.fetcher.Fetch(ctx, ld.desc)
624	if err != nil {
625		return nil, 0, err
626	}
627	defer rc.Close()
628
629	refKey := remotes.MakeRefKey(ctx, ld.desc)
630
631	ld.is.ContentStore.Abort(ctx, refKey)
632
633	if err := content.WriteBlob(ctx, ld.is.ContentStore, refKey, rc, ld.desc); err != nil {
634		ld.is.ContentStore.Abort(ctx, refKey)
635		return nil, 0, err
636	}
637
638	ra, err := ld.is.ContentStore.ReaderAt(ctx, ld.desc)
639	if err != nil {
640		return nil, 0, err
641	}
642
643	return ioutil.NopCloser(content.NewReader(ra)), ld.desc.Size, nil
644}
645
646func (ld *layerDescriptor) Close() {
647	// ld.is.ContentStore.Delete(context.TODO(), ld.desc.Digest))
648}
649
650func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
651	// Cache mapping from this layer's DiffID to the blobsum
652	ld.is.MetadataStore.Add(diffID, metadata.V2Metadata{Digest: ld.desc.Digest, SourceRepository: ld.ref.Locator})
653}
654
655func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, pw progress.Writer) {
656	var (
657		ticker   = time.NewTicker(100 * time.Millisecond)
658		statuses = map[string]statusInfo{}
659		done     bool
660	)
661	defer ticker.Stop()
662
663	for {
664		select {
665		case <-ticker.C:
666		case <-ctx.Done():
667			done = true
668		}
669
670		resolved := "resolved"
671		if !ongoing.isResolved() {
672			resolved = "resolving"
673		}
674		statuses[ongoing.name] = statusInfo{
675			Ref:    ongoing.name,
676			Status: resolved,
677		}
678
679		actives := make(map[string]statusInfo)
680
681		if !done {
682			active, err := cs.ListStatuses(ctx)
683			if err != nil {
684				// log.G(ctx).WithError(err).Error("active check failed")
685				continue
686			}
687			// update status of active entries!
688			for _, active := range active {
689				actives[active.Ref] = statusInfo{
690					Ref:       active.Ref,
691					Status:    "downloading",
692					Offset:    active.Offset,
693					Total:     active.Total,
694					StartedAt: active.StartedAt,
695					UpdatedAt: active.UpdatedAt,
696				}
697			}
698		}
699
700		// now, update the items in jobs that are not in active
701		for _, j := range ongoing.jobs() {
702			refKey := remotes.MakeRefKey(ctx, j.Descriptor)
703			if a, ok := actives[refKey]; ok {
704				started := j.started
705				_ = pw.Write(j.Digest.String(), progress.Status{
706					Action:  a.Status,
707					Total:   int(a.Total),
708					Current: int(a.Offset),
709					Started: &started,
710				})
711				continue
712			}
713
714			if !j.done {
715				info, err := cs.Info(context.TODO(), j.Digest)
716				if err != nil {
717					if containerderrors.IsNotFound(err) {
718						// _ = pw.Write(j.Digest.String(), progress.Status{
719						// 	Action: "waiting",
720						// })
721						continue
722					}
723				} else {
724					j.done = true
725				}
726
727				if done || j.done {
728					started := j.started
729					createdAt := info.CreatedAt
730					_ = pw.Write(j.Digest.String(), progress.Status{
731						Action:    "done",
732						Current:   int(info.Size),
733						Total:     int(info.Size),
734						Completed: &createdAt,
735						Started:   &started,
736					})
737				}
738			}
739		}
740		if done {
741			return
742		}
743	}
744}
745
746// jobs provides a way of identifying the download keys for a particular task
747// encountering during the pull walk.
748//
749// This is very minimal and will probably be replaced with something more
750// featured.
751type jobs struct {
752	name     string
753	added    map[digest.Digest]*job
754	mu       sync.Mutex
755	resolved bool
756}
757
758type job struct {
759	ocispec.Descriptor
760	done    bool
761	started time.Time
762}
763
764func newJobs(name string) *jobs {
765	return &jobs{
766		name:  name,
767		added: make(map[digest.Digest]*job),
768	}
769}
770
771func (j *jobs) add(desc ocispec.Descriptor) {
772	j.mu.Lock()
773	defer j.mu.Unlock()
774
775	if _, ok := j.added[desc.Digest]; ok {
776		return
777	}
778	j.added[desc.Digest] = &job{
779		Descriptor: desc,
780		started:    time.Now(),
781	}
782}
783
784func (j *jobs) jobs() []*job {
785	j.mu.Lock()
786	defer j.mu.Unlock()
787
788	descs := make([]*job, 0, len(j.added))
789	for _, j := range j.added {
790		descs = append(descs, j)
791	}
792	return descs
793}
794
795func (j *jobs) isResolved() bool {
796	j.mu.Lock()
797	defer j.mu.Unlock()
798	return j.resolved
799}
800
801type statusInfo struct {
802	Ref       string
803	Status    string
804	Offset    int64
805	Total     int64
806	StartedAt time.Time
807	UpdatedAt time.Time
808}
809
810func oneOffProgress(ctx context.Context, id string) func(err error) error {
811	pw, _, _ := progress.FromContext(ctx)
812	now := time.Now()
813	st := progress.Status{
814		Started: &now,
815	}
816	_ = pw.Write(id, st)
817	return func(err error) error {
818		// TODO: set error on status
819		now := time.Now()
820		st.Completed = &now
821		_ = pw.Write(id, st)
822		_ = pw.Close()
823		return err
824	}
825}
826
827// cacheKeyFromConfig returns a stable digest from image config. If image config
828// is a known oci image we will use chainID of layers.
829func cacheKeyFromConfig(dt []byte) digest.Digest {
830	var img ocispec.Image
831	err := json.Unmarshal(dt, &img)
832	if err != nil {
833		logrus.WithError(err).Errorf("failed to unmarshal image config for cache key %v", err)
834		return digest.FromBytes(dt)
835	}
836	if img.RootFS.Type != "layers" || len(img.RootFS.DiffIDs) == 0 {
837		return ""
838	}
839	return identity.ChainID(img.RootFS.DiffIDs)
840}
841
842// resolveModeToString is the equivalent of github.com/moby/buildkit/solver/llb.ResolveMode.String()
843// FIXME: add String method on source.ResolveMode
844func resolveModeToString(rm source.ResolveMode) string {
845	switch rm {
846	case source.ResolveModeDefault:
847		return "default"
848	case source.ResolveModeForcePull:
849		return "pull"
850	case source.ResolveModePreferLocal:
851		return "local"
852	}
853	return ""
854}
855
856func platformMatches(img *image.Image, p *ocispec.Platform) bool {
857	if img.Architecture != p.Architecture {
858		return false
859	}
860	if img.Variant != "" && img.Variant != p.Variant {
861		return false
862	}
863	return img.OS == p.OS
864}
865