1package containerimage
2
3import (
4	"context"
5	"encoding/json"
6	"fmt"
7	"io"
8	"io/ioutil"
9	"runtime"
10	"sync"
11	"time"
12
13	"github.com/containerd/containerd/content"
14	"github.com/containerd/containerd/errdefs"
15	"github.com/containerd/containerd/images"
16	"github.com/containerd/containerd/platforms"
17	ctdreference "github.com/containerd/containerd/reference"
18	"github.com/containerd/containerd/remotes"
19	"github.com/containerd/containerd/remotes/docker"
20	"github.com/containerd/containerd/remotes/docker/schema1"
21	distreference "github.com/docker/distribution/reference"
22	"github.com/docker/docker/distribution"
23	"github.com/docker/docker/distribution/metadata"
24	"github.com/docker/docker/distribution/xfer"
25	"github.com/docker/docker/image"
26	"github.com/docker/docker/layer"
27	pkgprogress "github.com/docker/docker/pkg/progress"
28	"github.com/docker/docker/reference"
29	"github.com/moby/buildkit/cache"
30	"github.com/moby/buildkit/session"
31	"github.com/moby/buildkit/session/auth"
32	"github.com/moby/buildkit/source"
33	"github.com/moby/buildkit/util/flightcontrol"
34	"github.com/moby/buildkit/util/imageutil"
35	"github.com/moby/buildkit/util/progress"
36	"github.com/moby/buildkit/util/tracing"
37	digest "github.com/opencontainers/go-digest"
38	"github.com/opencontainers/image-spec/identity"
39	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
40	"github.com/pkg/errors"
41	"golang.org/x/time/rate"
42)
43
44const preferLocal = true // FIXME: make this optional from the op
45
46// SourceOpt is options for creating the image source
47type SourceOpt struct {
48	SessionManager  *session.Manager
49	ContentStore    content.Store
50	CacheAccessor   cache.Accessor
51	ReferenceStore  reference.Store
52	DownloadManager distribution.RootFSDownloadManager
53	MetadataStore   metadata.V2MetadataService
54	ImageStore      image.Store
55}
56
57type imageSource struct {
58	SourceOpt
59	g flightcontrol.Group
60}
61
62// NewSource creates a new image source
63func NewSource(opt SourceOpt) (source.Source, error) {
64	is := &imageSource{
65		SourceOpt: opt,
66	}
67
68	return is, nil
69}
70
71func (is *imageSource) ID() string {
72	return source.DockerImageScheme
73}
74
75func (is *imageSource) getResolver(ctx context.Context) remotes.Resolver {
76	return docker.NewResolver(docker.ResolverOptions{
77		Client:      tracing.DefaultClient,
78		Credentials: is.getCredentialsFromSession(ctx),
79	})
80}
81
82func (is *imageSource) getCredentialsFromSession(ctx context.Context) func(string) (string, string, error) {
83	id := session.FromContext(ctx)
84	if id == "" {
85		return nil
86	}
87	return func(host string) (string, string, error) {
88		timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
89		defer cancel()
90
91		caller, err := is.SessionManager.Get(timeoutCtx, id)
92		if err != nil {
93			return "", "", err
94		}
95
96		return auth.CredentialsFunc(tracing.ContextWithSpanFromContext(context.TODO(), ctx), caller)(host)
97	}
98}
99
100func (is *imageSource) resolveLocal(refStr string) ([]byte, error) {
101	ref, err := distreference.ParseNormalizedNamed(refStr)
102	if err != nil {
103		return nil, err
104	}
105	dgst, err := is.ReferenceStore.Get(ref)
106	if err != nil {
107		return nil, err
108	}
109	img, err := is.ImageStore.Get(image.ID(dgst))
110	if err != nil {
111		return nil, err
112	}
113	return img.RawJSON(), nil
114}
115
116func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string) (digest.Digest, []byte, error) {
117	if preferLocal {
118		dt, err := is.resolveLocal(ref)
119		if err == nil {
120			return "", dt, nil
121		}
122	}
123
124	type t struct {
125		dgst digest.Digest
126		dt   []byte
127	}
128	res, err := is.g.Do(ctx, ref, func(ctx context.Context) (interface{}, error) {
129		dgst, dt, err := imageutil.Config(ctx, ref, is.getResolver(ctx), is.ContentStore, "")
130		if err != nil {
131			return nil, err
132		}
133		return &t{dgst: dgst, dt: dt}, nil
134	})
135	if err != nil {
136		return "", nil, err
137	}
138	typed := res.(*t)
139	return typed.dgst, typed.dt, nil
140}
141
142func (is *imageSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) {
143	imageIdentifier, ok := id.(*source.ImageIdentifier)
144	if !ok {
145		return nil, errors.Errorf("invalid image identifier %v", id)
146	}
147
148	p := &puller{
149		src:      imageIdentifier,
150		is:       is,
151		resolver: is.getResolver(ctx),
152	}
153	return p, nil
154}
155
156type puller struct {
157	is               *imageSource
158	resolveOnce      sync.Once
159	resolveLocalOnce sync.Once
160	src              *source.ImageIdentifier
161	desc             ocispec.Descriptor
162	ref              string
163	resolveErr       error
164	resolver         remotes.Resolver
165	config           []byte
166}
167
168func (p *puller) mainManifestKey(dgst digest.Digest) (digest.Digest, error) {
169	dt, err := json.Marshal(struct {
170		Digest digest.Digest
171		OS     string
172		Arch   string
173	}{
174		Digest: p.desc.Digest,
175		OS:     runtime.GOOS,
176		Arch:   runtime.GOARCH,
177	})
178	if err != nil {
179		return "", err
180	}
181	return digest.FromBytes(dt), nil
182}
183
184func (p *puller) resolveLocal() {
185	p.resolveLocalOnce.Do(func() {
186		dgst := p.src.Reference.Digest()
187		if dgst != "" {
188			info, err := p.is.ContentStore.Info(context.TODO(), dgst)
189			if err == nil {
190				p.ref = p.src.Reference.String()
191				desc := ocispec.Descriptor{
192					Size:   info.Size,
193					Digest: dgst,
194				}
195				ra, err := p.is.ContentStore.ReaderAt(context.TODO(), desc)
196				if err == nil {
197					mt, err := imageutil.DetectManifestMediaType(ra)
198					if err == nil {
199						desc.MediaType = mt
200						p.desc = desc
201					}
202				}
203			}
204		}
205
206		if preferLocal {
207			dt, err := p.is.resolveLocal(p.src.Reference.String())
208			if err == nil {
209				p.config = dt
210			}
211		}
212	})
213}
214
215func (p *puller) resolve(ctx context.Context) error {
216	p.resolveOnce.Do(func() {
217		resolveProgressDone := oneOffProgress(ctx, "resolve "+p.src.Reference.String())
218
219		ref, err := distreference.ParseNormalizedNamed(p.src.Reference.String())
220		if err != nil {
221			p.resolveErr = err
222			resolveProgressDone(err)
223			return
224		}
225
226		if p.desc.Digest == "" && p.config == nil {
227			origRef, desc, err := p.resolver.Resolve(ctx, ref.String())
228			if err != nil {
229				p.resolveErr = err
230				resolveProgressDone(err)
231				return
232			}
233
234			p.desc = desc
235			p.ref = origRef
236		}
237
238		// Schema 1 manifests cannot be resolved to an image config
239		// since the conversion must take place after all the content
240		// has been read.
241		// It may be possible to have a mapping between schema 1 manifests
242		// and the schema 2 manifests they are converted to.
243		if p.config == nil && p.desc.MediaType != images.MediaTypeDockerSchema1Manifest {
244			ref, err := distreference.WithDigest(ref, p.desc.Digest)
245			if err != nil {
246				p.resolveErr = err
247				resolveProgressDone(err)
248				return
249			}
250
251			_, dt, err := p.is.ResolveImageConfig(ctx, ref.String())
252			if err != nil {
253				p.resolveErr = err
254				resolveProgressDone(err)
255				return
256			}
257
258			p.config = dt
259		}
260		resolveProgressDone(nil)
261	})
262	return p.resolveErr
263}
264
265func (p *puller) CacheKey(ctx context.Context, index int) (string, bool, error) {
266	p.resolveLocal()
267
268	if p.desc.Digest != "" && index == 0 {
269		dgst, err := p.mainManifestKey(p.desc.Digest)
270		if err != nil {
271			return "", false, err
272		}
273		return dgst.String(), false, nil
274	}
275
276	if p.config != nil {
277		return cacheKeyFromConfig(p.config).String(), true, nil
278	}
279
280	if err := p.resolve(ctx); err != nil {
281		return "", false, err
282	}
283
284	if p.desc.Digest != "" && index == 0 {
285		dgst, err := p.mainManifestKey(p.desc.Digest)
286		if err != nil {
287			return "", false, err
288		}
289		return dgst.String(), false, nil
290	}
291
292	return cacheKeyFromConfig(p.config).String(), true, nil
293}
294
295func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
296	p.resolveLocal()
297	if err := p.resolve(ctx); err != nil {
298		return nil, err
299	}
300
301	if p.config != nil {
302		img, err := p.is.ImageStore.Get(image.ID(digest.FromBytes(p.config)))
303		if err == nil {
304			if len(img.RootFS.DiffIDs) == 0 {
305				return nil, nil
306			}
307			ref, err := p.is.CacheAccessor.GetFromSnapshotter(ctx, string(img.RootFS.ChainID()), cache.WithDescription(fmt.Sprintf("from local %s", p.ref)))
308			if err != nil {
309				return nil, err
310			}
311			return ref, nil
312		}
313	}
314
315	ongoing := newJobs(p.ref)
316
317	pctx, stopProgress := context.WithCancel(ctx)
318
319	pw, _, ctx := progress.FromContext(ctx)
320	defer pw.Close()
321
322	progressDone := make(chan struct{})
323	go func() {
324		showProgress(pctx, ongoing, p.is.ContentStore, pw)
325		close(progressDone)
326	}()
327	defer func() {
328		<-progressDone
329	}()
330
331	fetcher, err := p.resolver.Fetcher(ctx, p.ref)
332	if err != nil {
333		stopProgress()
334		return nil, err
335	}
336
337	var (
338		schema1Converter *schema1.Converter
339		handlers         []images.Handler
340	)
341	if p.desc.MediaType == images.MediaTypeDockerSchema1Manifest {
342		schema1Converter = schema1.NewConverter(p.is.ContentStore, fetcher)
343		handlers = append(handlers, schema1Converter)
344
345		// TODO: Optimize to do dispatch and integrate pulling with download manager,
346		// leverage existing blob mapping and layer storage
347	} else {
348
349		// TODO: need a wrapper snapshot interface that combines content
350		// and snapshots as 1) buildkit shouldn't have a dependency on contentstore
351		// or 2) cachemanager should manage the contentstore
352		handlers = append(handlers, images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
353			switch desc.MediaType {
354			case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
355				images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex,
356				images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
357			default:
358				return nil, images.ErrSkipDesc
359			}
360			ongoing.add(desc)
361			return nil, nil
362		}))
363
364		// Get all the children for a descriptor
365		childrenHandler := images.ChildrenHandler(p.is.ContentStore)
366		// Set any children labels for that content
367		childrenHandler = images.SetChildrenLabels(p.is.ContentStore, childrenHandler)
368		// Filter the childen by the platform
369		childrenHandler = images.FilterPlatforms(childrenHandler, platforms.Default())
370
371		handlers = append(handlers,
372			remotes.FetchHandler(p.is.ContentStore, fetcher),
373			childrenHandler,
374		)
375	}
376
377	if err := images.Dispatch(ctx, images.Handlers(handlers...), p.desc); err != nil {
378		stopProgress()
379		return nil, err
380	}
381	defer stopProgress()
382
383	if schema1Converter != nil {
384		p.desc, err = schema1Converter.Convert(ctx)
385		if err != nil {
386			return nil, err
387		}
388	}
389
390	mfst, err := images.Manifest(ctx, p.is.ContentStore, p.desc, platforms.Default())
391	if err != nil {
392		return nil, err
393	}
394
395	config, err := images.Config(ctx, p.is.ContentStore, p.desc, platforms.Default())
396	if err != nil {
397		return nil, err
398	}
399
400	dt, err := content.ReadBlob(ctx, p.is.ContentStore, config)
401	if err != nil {
402		return nil, err
403	}
404
405	var img ocispec.Image
406	if err := json.Unmarshal(dt, &img); err != nil {
407		return nil, err
408	}
409
410	if len(mfst.Layers) != len(img.RootFS.DiffIDs) {
411		return nil, errors.Errorf("invalid config for manifest")
412	}
413
414	pchan := make(chan pkgprogress.Progress, 10)
415	defer close(pchan)
416
417	go func() {
418		m := map[string]struct {
419			st      time.Time
420			limiter *rate.Limiter
421		}{}
422		for p := range pchan {
423			if p.Action == "Extracting" {
424				st, ok := m[p.ID]
425				if !ok {
426					st.st = time.Now()
427					st.limiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
428					m[p.ID] = st
429				}
430				var end *time.Time
431				if p.LastUpdate || st.limiter.Allow() {
432					if p.LastUpdate {
433						tm := time.Now()
434						end = &tm
435					}
436					pw.Write("extracting "+p.ID, progress.Status{
437						Action:    "extract",
438						Started:   &st.st,
439						Completed: end,
440					})
441				}
442			}
443		}
444	}()
445
446	if len(mfst.Layers) == 0 {
447		return nil, nil
448	}
449
450	layers := make([]xfer.DownloadDescriptor, 0, len(mfst.Layers))
451
452	for i, desc := range mfst.Layers {
453		ongoing.add(desc)
454		layers = append(layers, &layerDescriptor{
455			desc:    desc,
456			diffID:  layer.DiffID(img.RootFS.DiffIDs[i]),
457			fetcher: fetcher,
458			ref:     p.src.Reference,
459			is:      p.is,
460		})
461	}
462
463	defer func() {
464		<-progressDone
465		for _, desc := range mfst.Layers {
466			p.is.ContentStore.Delete(context.TODO(), desc.Digest)
467		}
468	}()
469
470	r := image.NewRootFS()
471	rootFS, release, err := p.is.DownloadManager.Download(ctx, *r, runtime.GOOS, layers, pkgprogress.ChanOutput(pchan))
472	if err != nil {
473		return nil, err
474	}
475	stopProgress()
476
477	ref, err := p.is.CacheAccessor.GetFromSnapshotter(ctx, string(rootFS.ChainID()), cache.WithDescription(fmt.Sprintf("pulled from %s", p.ref)))
478	release()
479	if err != nil {
480		return nil, err
481	}
482
483	return ref, nil
484}
485
486// Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
487type layerDescriptor struct {
488	is      *imageSource
489	fetcher remotes.Fetcher
490	desc    ocispec.Descriptor
491	diffID  layer.DiffID
492	ref     ctdreference.Spec
493}
494
495func (ld *layerDescriptor) Key() string {
496	return "v2:" + ld.desc.Digest.String()
497}
498
499func (ld *layerDescriptor) ID() string {
500	return ld.desc.Digest.String()
501}
502
503func (ld *layerDescriptor) DiffID() (layer.DiffID, error) {
504	return ld.diffID, nil
505}
506
507func (ld *layerDescriptor) Download(ctx context.Context, progressOutput pkgprogress.Output) (io.ReadCloser, int64, error) {
508	rc, err := ld.fetcher.Fetch(ctx, ld.desc)
509	if err != nil {
510		return nil, 0, err
511	}
512	defer rc.Close()
513
514	refKey := remotes.MakeRefKey(ctx, ld.desc)
515
516	ld.is.ContentStore.Abort(ctx, refKey)
517
518	if err := content.WriteBlob(ctx, ld.is.ContentStore, refKey, rc, ld.desc); err != nil {
519		ld.is.ContentStore.Abort(ctx, refKey)
520		return nil, 0, err
521	}
522
523	ra, err := ld.is.ContentStore.ReaderAt(ctx, ld.desc)
524	if err != nil {
525		return nil, 0, err
526	}
527
528	return ioutil.NopCloser(content.NewReader(ra)), ld.desc.Size, nil
529}
530
531func (ld *layerDescriptor) Close() {
532	// ld.is.ContentStore.Delete(context.TODO(), ld.desc.Digest))
533}
534
535func (ld *layerDescriptor) Registered(diffID layer.DiffID) {
536	// Cache mapping from this layer's DiffID to the blobsum
537	ld.is.MetadataStore.Add(diffID, metadata.V2Metadata{Digest: ld.desc.Digest, SourceRepository: ld.ref.Locator})
538}
539
540func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, pw progress.Writer) {
541	var (
542		ticker   = time.NewTicker(100 * time.Millisecond)
543		statuses = map[string]statusInfo{}
544		done     bool
545	)
546	defer ticker.Stop()
547
548	for {
549		select {
550		case <-ticker.C:
551		case <-ctx.Done():
552			done = true
553		}
554
555		resolved := "resolved"
556		if !ongoing.isResolved() {
557			resolved = "resolving"
558		}
559		statuses[ongoing.name] = statusInfo{
560			Ref:    ongoing.name,
561			Status: resolved,
562		}
563
564		actives := make(map[string]statusInfo)
565
566		if !done {
567			active, err := cs.ListStatuses(ctx)
568			if err != nil {
569				// log.G(ctx).WithError(err).Error("active check failed")
570				continue
571			}
572			// update status of active entries!
573			for _, active := range active {
574				actives[active.Ref] = statusInfo{
575					Ref:       active.Ref,
576					Status:    "downloading",
577					Offset:    active.Offset,
578					Total:     active.Total,
579					StartedAt: active.StartedAt,
580					UpdatedAt: active.UpdatedAt,
581				}
582			}
583		}
584
585		// now, update the items in jobs that are not in active
586		for _, j := range ongoing.jobs() {
587			refKey := remotes.MakeRefKey(ctx, j.Descriptor)
588			if a, ok := actives[refKey]; ok {
589				started := j.started
590				pw.Write(j.Digest.String(), progress.Status{
591					Action:  a.Status,
592					Total:   int(a.Total),
593					Current: int(a.Offset),
594					Started: &started,
595				})
596				continue
597			}
598
599			if !j.done {
600				info, err := cs.Info(context.TODO(), j.Digest)
601				if err != nil {
602					if errdefs.IsNotFound(err) {
603						// pw.Write(j.Digest.String(), progress.Status{
604						// 	Action: "waiting",
605						// })
606						continue
607					}
608				} else {
609					j.done = true
610				}
611
612				if done || j.done {
613					started := j.started
614					createdAt := info.CreatedAt
615					pw.Write(j.Digest.String(), progress.Status{
616						Action:    "done",
617						Current:   int(info.Size),
618						Total:     int(info.Size),
619						Completed: &createdAt,
620						Started:   &started,
621					})
622				}
623			}
624		}
625		if done {
626			return
627		}
628	}
629}
630
631// jobs provides a way of identifying the download keys for a particular task
632// encountering during the pull walk.
633//
634// This is very minimal and will probably be replaced with something more
635// featured.
636type jobs struct {
637	name     string
638	added    map[digest.Digest]job
639	mu       sync.Mutex
640	resolved bool
641}
642
643type job struct {
644	ocispec.Descriptor
645	done    bool
646	started time.Time
647}
648
649func newJobs(name string) *jobs {
650	return &jobs{
651		name:  name,
652		added: make(map[digest.Digest]job),
653	}
654}
655
656func (j *jobs) add(desc ocispec.Descriptor) {
657	j.mu.Lock()
658	defer j.mu.Unlock()
659
660	if _, ok := j.added[desc.Digest]; ok {
661		return
662	}
663	j.added[desc.Digest] = job{
664		Descriptor: desc,
665		started:    time.Now(),
666	}
667}
668
669func (j *jobs) jobs() []job {
670	j.mu.Lock()
671	defer j.mu.Unlock()
672
673	descs := make([]job, 0, len(j.added))
674	for _, j := range j.added {
675		descs = append(descs, j)
676	}
677	return descs
678}
679
680func (j *jobs) isResolved() bool {
681	j.mu.Lock()
682	defer j.mu.Unlock()
683	return j.resolved
684}
685
686type statusInfo struct {
687	Ref       string
688	Status    string
689	Offset    int64
690	Total     int64
691	StartedAt time.Time
692	UpdatedAt time.Time
693}
694
695func oneOffProgress(ctx context.Context, id string) func(err error) error {
696	pw, _, _ := progress.FromContext(ctx)
697	now := time.Now()
698	st := progress.Status{
699		Started: &now,
700	}
701	pw.Write(id, st)
702	return func(err error) error {
703		// TODO: set error on status
704		now := time.Now()
705		st.Completed = &now
706		pw.Write(id, st)
707		pw.Close()
708		return err
709	}
710}
711
712// cacheKeyFromConfig returns a stable digest from image config. If image config
713// is a known oci image we will use chainID of layers.
714func cacheKeyFromConfig(dt []byte) digest.Digest {
715	var img ocispec.Image
716	err := json.Unmarshal(dt, &img)
717	if err != nil {
718		return digest.FromBytes(dt)
719	}
720	if img.RootFS.Type != "layers" {
721		return digest.FromBytes(dt)
722	}
723	return identity.ChainID(img.RootFS.DiffIDs)
724}
725