1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package containerd
18
19import (
20	"bytes"
21	"context"
22	"encoding/json"
23	"fmt"
24	"net/http"
25	"runtime"
26	"strconv"
27	"strings"
28	"sync"
29	"time"
30
31	containersapi "github.com/containerd/containerd/api/services/containers/v1"
32	contentapi "github.com/containerd/containerd/api/services/content/v1"
33	diffapi "github.com/containerd/containerd/api/services/diff/v1"
34	eventsapi "github.com/containerd/containerd/api/services/events/v1"
35	imagesapi "github.com/containerd/containerd/api/services/images/v1"
36	introspectionapi "github.com/containerd/containerd/api/services/introspection/v1"
37	leasesapi "github.com/containerd/containerd/api/services/leases/v1"
38	namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
39	snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
40	"github.com/containerd/containerd/api/services/tasks/v1"
41	versionservice "github.com/containerd/containerd/api/services/version/v1"
42	"github.com/containerd/containerd/containers"
43	"github.com/containerd/containerd/content"
44	contentproxy "github.com/containerd/containerd/content/proxy"
45	"github.com/containerd/containerd/defaults"
46	"github.com/containerd/containerd/errdefs"
47	"github.com/containerd/containerd/events"
48	"github.com/containerd/containerd/images"
49	"github.com/containerd/containerd/leases"
50	leasesproxy "github.com/containerd/containerd/leases/proxy"
51	"github.com/containerd/containerd/namespaces"
52	"github.com/containerd/containerd/pkg/dialer"
53	"github.com/containerd/containerd/platforms"
54	"github.com/containerd/containerd/plugin"
55	"github.com/containerd/containerd/remotes"
56	"github.com/containerd/containerd/remotes/docker"
57	"github.com/containerd/containerd/snapshots"
58	snproxy "github.com/containerd/containerd/snapshots/proxy"
59	"github.com/containerd/typeurl"
60	"github.com/gogo/protobuf/types"
61	ptypes "github.com/gogo/protobuf/types"
62	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
63	specs "github.com/opencontainers/runtime-spec/specs-go"
64	"github.com/pkg/errors"
65	"google.golang.org/grpc"
66	"google.golang.org/grpc/health/grpc_health_v1"
67)
68
69func init() {
70	const prefix = "types.containerd.io"
71	// register TypeUrls for commonly marshaled external types
72	major := strconv.Itoa(specs.VersionMajor)
73	typeurl.Register(&specs.Spec{}, prefix, "opencontainers/runtime-spec", major, "Spec")
74	typeurl.Register(&specs.Process{}, prefix, "opencontainers/runtime-spec", major, "Process")
75	typeurl.Register(&specs.LinuxResources{}, prefix, "opencontainers/runtime-spec", major, "LinuxResources")
76	typeurl.Register(&specs.WindowsResources{}, prefix, "opencontainers/runtime-spec", major, "WindowsResources")
77}
78
79// New returns a new containerd client that is connected to the containerd
80// instance provided by address
81func New(address string, opts ...ClientOpt) (*Client, error) {
82	var copts clientOpts
83	for _, o := range opts {
84		if err := o(&copts); err != nil {
85			return nil, err
86		}
87	}
88	if copts.timeout == 0 {
89		copts.timeout = 10 * time.Second
90	}
91
92	c := &Client{
93		defaultns: copts.defaultns,
94	}
95
96	if copts.defaultRuntime != "" {
97		c.runtime = copts.defaultRuntime
98	} else {
99		c.runtime = defaults.DefaultRuntime
100	}
101
102	if copts.defaultPlatform != nil {
103		c.platform = copts.defaultPlatform
104	} else {
105		c.platform = platforms.Default()
106	}
107
108	if copts.services != nil {
109		c.services = *copts.services
110	}
111	if address != "" {
112		gopts := []grpc.DialOption{
113			grpc.WithBlock(),
114			grpc.WithInsecure(),
115			grpc.FailOnNonTempDialError(true),
116			grpc.WithBackoffMaxDelay(3 * time.Second),
117			grpc.WithContextDialer(dialer.ContextDialer),
118
119			// TODO(stevvooe): We may need to allow configuration of this on the client.
120			grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
121			grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
122		}
123		if len(copts.dialOptions) > 0 {
124			gopts = copts.dialOptions
125		}
126		if copts.defaultns != "" {
127			unary, stream := newNSInterceptors(copts.defaultns)
128			gopts = append(gopts,
129				grpc.WithUnaryInterceptor(unary),
130				grpc.WithStreamInterceptor(stream),
131			)
132		}
133		connector := func() (*grpc.ClientConn, error) {
134			ctx, cancel := context.WithTimeout(context.Background(), copts.timeout)
135			defer cancel()
136			conn, err := grpc.DialContext(ctx, dialer.DialAddress(address), gopts...)
137			if err != nil {
138				return nil, errors.Wrapf(err, "failed to dial %q", address)
139			}
140			return conn, nil
141		}
142		conn, err := connector()
143		if err != nil {
144			return nil, err
145		}
146		c.conn, c.connector = conn, connector
147	}
148	if copts.services == nil && c.conn == nil {
149		return nil, errors.Wrap(errdefs.ErrUnavailable, "no grpc connection or services is available")
150	}
151
152	// check namespace labels for default runtime
153	if copts.defaultRuntime == "" && c.defaultns != "" {
154		if label, err := c.GetLabel(context.Background(), defaults.DefaultRuntimeNSLabel); err != nil {
155			return nil, err
156		} else if label != "" {
157			c.runtime = label
158		}
159	}
160
161	return c, nil
162}
163
164// NewWithConn returns a new containerd client that is connected to the containerd
165// instance provided by the connection
166func NewWithConn(conn *grpc.ClientConn, opts ...ClientOpt) (*Client, error) {
167	var copts clientOpts
168	for _, o := range opts {
169		if err := o(&copts); err != nil {
170			return nil, err
171		}
172	}
173	c := &Client{
174		defaultns: copts.defaultns,
175		conn:      conn,
176		runtime:   fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS),
177	}
178
179	// check namespace labels for default runtime
180	if copts.defaultRuntime == "" && c.defaultns != "" {
181		if label, err := c.GetLabel(context.Background(), defaults.DefaultRuntimeNSLabel); err != nil {
182			return nil, err
183		} else if label != "" {
184			c.runtime = label
185		}
186	}
187
188	if copts.services != nil {
189		c.services = *copts.services
190	}
191	return c, nil
192}
193
194// Client is the client to interact with containerd and its various services
195// using a uniform interface
196type Client struct {
197	services
198	connMu    sync.Mutex
199	conn      *grpc.ClientConn
200	runtime   string
201	defaultns string
202	platform  platforms.MatchComparer
203	connector func() (*grpc.ClientConn, error)
204}
205
206// Reconnect re-establishes the GRPC connection to the containerd daemon
207func (c *Client) Reconnect() error {
208	if c.connector == nil {
209		return errors.Wrap(errdefs.ErrUnavailable, "unable to reconnect to containerd, no connector available")
210	}
211	c.connMu.Lock()
212	defer c.connMu.Unlock()
213	c.conn.Close()
214	conn, err := c.connector()
215	if err != nil {
216		return err
217	}
218	c.conn = conn
219	return nil
220}
221
222// IsServing returns true if the client can successfully connect to the
223// containerd daemon and the healthcheck service returns the SERVING
224// response.
225// This call will block if a transient error is encountered during
226// connection. A timeout can be set in the context to ensure it returns
227// early.
228func (c *Client) IsServing(ctx context.Context) (bool, error) {
229	c.connMu.Lock()
230	if c.conn == nil {
231		c.connMu.Unlock()
232		return false, errors.Wrap(errdefs.ErrUnavailable, "no grpc connection available")
233	}
234	c.connMu.Unlock()
235	r, err := c.HealthService().Check(ctx, &grpc_health_v1.HealthCheckRequest{}, grpc.WaitForReady(true))
236	if err != nil {
237		return false, err
238	}
239	return r.Status == grpc_health_v1.HealthCheckResponse_SERVING, nil
240}
241
242// Containers returns all containers created in containerd
243func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container, error) {
244	r, err := c.ContainerService().List(ctx, filters...)
245	if err != nil {
246		return nil, err
247	}
248	var out []Container
249	for _, container := range r {
250		out = append(out, containerFromRecord(c, container))
251	}
252	return out, nil
253}
254
255// NewContainer will create a new container in container with the provided id
256// the id must be unique within the namespace
257func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
258	ctx, done, err := c.WithLease(ctx)
259	if err != nil {
260		return nil, err
261	}
262	defer done(ctx)
263
264	container := containers.Container{
265		ID: id,
266		Runtime: containers.RuntimeInfo{
267			Name: c.runtime,
268		},
269	}
270	for _, o := range opts {
271		if err := o(ctx, c, &container); err != nil {
272			return nil, err
273		}
274	}
275	r, err := c.ContainerService().Create(ctx, container)
276	if err != nil {
277		return nil, err
278	}
279	return containerFromRecord(c, r), nil
280}
281
282// LoadContainer loads an existing container from metadata
283func (c *Client) LoadContainer(ctx context.Context, id string) (Container, error) {
284	r, err := c.ContainerService().Get(ctx, id)
285	if err != nil {
286		return nil, err
287	}
288	return containerFromRecord(c, r), nil
289}
290
291// RemoteContext is used to configure object resolutions and transfers with
292// remote content stores and image providers.
293type RemoteContext struct {
294	// Resolver is used to resolve names to objects, fetchers, and pushers.
295	// If no resolver is provided, defaults to Docker registry resolver.
296	Resolver remotes.Resolver
297
298	// PlatformMatcher is used to match the platforms for an image
299	// operation and define the preference when a single match is required
300	// from multiple platforms.
301	PlatformMatcher platforms.MatchComparer
302
303	// Unpack is done after an image is pulled to extract into a snapshotter.
304	// It is done simultaneously for schema 2 images when they are pulled.
305	// If an image is not unpacked on pull, it can be unpacked any time
306	// afterwards. Unpacking is required to run an image.
307	Unpack bool
308
309	// UnpackOpts handles options to the unpack call.
310	UnpackOpts []UnpackOpt
311
312	// Snapshotter used for unpacking
313	Snapshotter string
314
315	// Labels to be applied to the created image
316	Labels map[string]string
317
318	// BaseHandlers are a set of handlers which get are called on dispatch.
319	// These handlers always get called before any operation specific
320	// handlers.
321	BaseHandlers []images.Handler
322
323	// HandlerWrapper wraps the handler which gets sent to dispatch.
324	// Unlike BaseHandlers, this can run before and after the built
325	// in handlers, allowing operations to run on the descriptor
326	// after it has completed transferring.
327	HandlerWrapper func(images.Handler) images.Handler
328
329	// ConvertSchema1 is whether to convert Docker registry schema 1
330	// manifests. If this option is false then any image which resolves
331	// to schema 1 will return an error since schema 1 is not supported.
332	ConvertSchema1 bool
333
334	// Platforms defines which platforms to handle when doing the image operation.
335	// Platforms is ignored when a PlatformMatcher is set, otherwise the
336	// platforms will be used to create a PlatformMatcher with no ordering
337	// preference.
338	Platforms []string
339
340	// MaxConcurrentDownloads is the max concurrent content downloads for each pull.
341	MaxConcurrentDownloads int
342
343	// AllMetadata downloads all manifests and known-configuration files
344	AllMetadata bool
345}
346
347func defaultRemoteContext() *RemoteContext {
348	return &RemoteContext{
349		Resolver: docker.NewResolver(docker.ResolverOptions{
350			Client: http.DefaultClient,
351		}),
352	}
353}
354
355// Fetch downloads the provided content into containerd's content store
356// and returns a non-platform specific image reference
357func (c *Client) Fetch(ctx context.Context, ref string, opts ...RemoteOpt) (images.Image, error) {
358	fetchCtx := defaultRemoteContext()
359	for _, o := range opts {
360		if err := o(c, fetchCtx); err != nil {
361			return images.Image{}, err
362		}
363	}
364
365	if fetchCtx.Unpack {
366		return images.Image{}, errors.Wrap(errdefs.ErrNotImplemented, "unpack on fetch not supported, try pull")
367	}
368
369	if fetchCtx.PlatformMatcher == nil {
370		if len(fetchCtx.Platforms) == 0 {
371			fetchCtx.PlatformMatcher = platforms.All
372		} else {
373			var ps []ocispec.Platform
374			for _, s := range fetchCtx.Platforms {
375				p, err := platforms.Parse(s)
376				if err != nil {
377					return images.Image{}, errors.Wrapf(err, "invalid platform %s", s)
378				}
379				ps = append(ps, p)
380			}
381
382			fetchCtx.PlatformMatcher = platforms.Any(ps...)
383		}
384	}
385
386	ctx, done, err := c.WithLease(ctx)
387	if err != nil {
388		return images.Image{}, err
389	}
390	defer done(ctx)
391
392	return c.fetch(ctx, fetchCtx, ref, 0)
393}
394
395// Push uploads the provided content to a remote resource
396func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpt) error {
397	pushCtx := defaultRemoteContext()
398	for _, o := range opts {
399		if err := o(c, pushCtx); err != nil {
400			return err
401		}
402	}
403	if pushCtx.PlatformMatcher == nil {
404		if len(pushCtx.Platforms) > 0 {
405			var ps []ocispec.Platform
406			for _, platform := range pushCtx.Platforms {
407				p, err := platforms.Parse(platform)
408				if err != nil {
409					return errors.Wrapf(err, "invalid platform %s", platform)
410				}
411				ps = append(ps, p)
412			}
413			pushCtx.PlatformMatcher = platforms.Any(ps...)
414		} else {
415			pushCtx.PlatformMatcher = platforms.All
416		}
417	}
418
419	// Annotate ref with digest to push only push tag for single digest
420	if !strings.Contains(ref, "@") {
421		ref = ref + "@" + desc.Digest.String()
422	}
423
424	pusher, err := pushCtx.Resolver.Pusher(ctx, ref)
425	if err != nil {
426		return err
427	}
428
429	var wrapper func(images.Handler) images.Handler
430
431	if len(pushCtx.BaseHandlers) > 0 {
432		wrapper = func(h images.Handler) images.Handler {
433			h = images.Handlers(append(pushCtx.BaseHandlers, h)...)
434			if pushCtx.HandlerWrapper != nil {
435				h = pushCtx.HandlerWrapper(h)
436			}
437			return h
438		}
439	} else if pushCtx.HandlerWrapper != nil {
440		wrapper = pushCtx.HandlerWrapper
441	}
442
443	return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), pushCtx.PlatformMatcher, wrapper)
444}
445
446// GetImage returns an existing image
447func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) {
448	i, err := c.ImageService().Get(ctx, ref)
449	if err != nil {
450		return nil, err
451	}
452	return NewImage(c, i), nil
453}
454
455// ListImages returns all existing images
456func (c *Client) ListImages(ctx context.Context, filters ...string) ([]Image, error) {
457	imgs, err := c.ImageService().List(ctx, filters...)
458	if err != nil {
459		return nil, err
460	}
461	images := make([]Image, len(imgs))
462	for i, img := range imgs {
463		images[i] = NewImage(c, img)
464	}
465	return images, nil
466}
467
468// Restore restores a container from a checkpoint
469func (c *Client) Restore(ctx context.Context, id string, checkpoint Image, opts ...RestoreOpts) (Container, error) {
470	store := c.ContentStore()
471	index, err := decodeIndex(ctx, store, checkpoint.Target())
472	if err != nil {
473		return nil, err
474	}
475
476	ctx, done, err := c.WithLease(ctx)
477	if err != nil {
478		return nil, err
479	}
480	defer done(ctx)
481
482	copts := []NewContainerOpts{}
483	for _, o := range opts {
484		copts = append(copts, o(ctx, id, c, checkpoint, index))
485	}
486
487	ctr, err := c.NewContainer(ctx, id, copts...)
488	if err != nil {
489		return nil, err
490	}
491
492	return ctr, nil
493}
494
495func writeIndex(ctx context.Context, index *ocispec.Index, client *Client, ref string) (d ocispec.Descriptor, err error) {
496	labels := map[string]string{}
497	for i, m := range index.Manifests {
498		labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = m.Digest.String()
499	}
500	data, err := json.Marshal(index)
501	if err != nil {
502		return ocispec.Descriptor{}, err
503	}
504	return writeContent(ctx, client.ContentStore(), ocispec.MediaTypeImageIndex, ref, bytes.NewReader(data), content.WithLabels(labels))
505}
506
507// GetLabel gets a label value from namespace store
508// If there is no default label, an empty string returned with nil error
509func (c *Client) GetLabel(ctx context.Context, label string) (string, error) {
510	ns, err := namespaces.NamespaceRequired(ctx)
511	if err != nil {
512		if c.defaultns == "" {
513			return "", err
514		}
515		ns = c.defaultns
516	}
517
518	srv := c.NamespaceService()
519	labels, err := srv.Labels(ctx, ns)
520	if err != nil {
521		return "", err
522	}
523
524	value := labels[label]
525	return value, nil
526}
527
528// Subscribe to events that match one or more of the provided filters.
529//
530// Callers should listen on both the envelope and errs channels. If the errs
531// channel returns nil or an error, the subscriber should terminate.
532//
533// The subscriber can stop receiving events by canceling the provided context.
534// The errs channel will be closed and return a nil error.
535func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) {
536	return c.EventService().Subscribe(ctx, filters...)
537}
538
539// Close closes the clients connection to containerd
540func (c *Client) Close() error {
541	c.connMu.Lock()
542	defer c.connMu.Unlock()
543	if c.conn != nil {
544		return c.conn.Close()
545	}
546	return nil
547}
548
549// NamespaceService returns the underlying Namespaces Store
550func (c *Client) NamespaceService() namespaces.Store {
551	if c.namespaceStore != nil {
552		return c.namespaceStore
553	}
554	c.connMu.Lock()
555	defer c.connMu.Unlock()
556	return NewNamespaceStoreFromClient(namespacesapi.NewNamespacesClient(c.conn))
557}
558
559// ContainerService returns the underlying container Store
560func (c *Client) ContainerService() containers.Store {
561	if c.containerStore != nil {
562		return c.containerStore
563	}
564	c.connMu.Lock()
565	defer c.connMu.Unlock()
566	return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn))
567}
568
569// ContentStore returns the underlying content Store
570func (c *Client) ContentStore() content.Store {
571	if c.contentStore != nil {
572		return c.contentStore
573	}
574	c.connMu.Lock()
575	defer c.connMu.Unlock()
576	return contentproxy.NewContentStore(contentapi.NewContentClient(c.conn))
577}
578
579// SnapshotService returns the underlying snapshotter for the provided snapshotter name
580func (c *Client) SnapshotService(snapshotterName string) snapshots.Snapshotter {
581	snapshotterName, err := c.resolveSnapshotterName(context.Background(), snapshotterName)
582	if err != nil {
583		snapshotterName = DefaultSnapshotter
584	}
585	if c.snapshotters != nil {
586		return c.snapshotters[snapshotterName]
587	}
588	c.connMu.Lock()
589	defer c.connMu.Unlock()
590	return snproxy.NewSnapshotter(snapshotsapi.NewSnapshotsClient(c.conn), snapshotterName)
591}
592
593// TaskService returns the underlying TasksClient
594func (c *Client) TaskService() tasks.TasksClient {
595	if c.taskService != nil {
596		return c.taskService
597	}
598	c.connMu.Lock()
599	defer c.connMu.Unlock()
600	return tasks.NewTasksClient(c.conn)
601}
602
603// ImageService returns the underlying image Store
604func (c *Client) ImageService() images.Store {
605	if c.imageStore != nil {
606		return c.imageStore
607	}
608	c.connMu.Lock()
609	defer c.connMu.Unlock()
610	return NewImageStoreFromClient(imagesapi.NewImagesClient(c.conn))
611}
612
613// DiffService returns the underlying Differ
614func (c *Client) DiffService() DiffService {
615	if c.diffService != nil {
616		return c.diffService
617	}
618	c.connMu.Lock()
619	defer c.connMu.Unlock()
620	return NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn))
621}
622
623// IntrospectionService returns the underlying Introspection Client
624func (c *Client) IntrospectionService() introspectionapi.IntrospectionClient {
625	c.connMu.Lock()
626	defer c.connMu.Unlock()
627	return introspectionapi.NewIntrospectionClient(c.conn)
628}
629
630// LeasesService returns the underlying Leases Client
631func (c *Client) LeasesService() leases.Manager {
632	if c.leasesService != nil {
633		return c.leasesService
634	}
635	c.connMu.Lock()
636	defer c.connMu.Unlock()
637	return leasesproxy.NewLeaseManager(leasesapi.NewLeasesClient(c.conn))
638}
639
640// HealthService returns the underlying GRPC HealthClient
641func (c *Client) HealthService() grpc_health_v1.HealthClient {
642	c.connMu.Lock()
643	defer c.connMu.Unlock()
644	return grpc_health_v1.NewHealthClient(c.conn)
645}
646
647// EventService returns the underlying event service
648func (c *Client) EventService() EventService {
649	if c.eventService != nil {
650		return c.eventService
651	}
652	c.connMu.Lock()
653	defer c.connMu.Unlock()
654	return NewEventServiceFromClient(eventsapi.NewEventsClient(c.conn))
655}
656
657// VersionService returns the underlying VersionClient
658func (c *Client) VersionService() versionservice.VersionClient {
659	c.connMu.Lock()
660	defer c.connMu.Unlock()
661	return versionservice.NewVersionClient(c.conn)
662}
663
664// Conn returns the underlying GRPC connection object
665func (c *Client) Conn() *grpc.ClientConn {
666	c.connMu.Lock()
667	defer c.connMu.Unlock()
668	return c.conn
669}
670
671// Version of containerd
672type Version struct {
673	// Version number
674	Version string
675	// Revision from git that was built
676	Revision string
677}
678
679// Version returns the version of containerd that the client is connected to
680func (c *Client) Version(ctx context.Context) (Version, error) {
681	c.connMu.Lock()
682	if c.conn == nil {
683		c.connMu.Unlock()
684		return Version{}, errors.Wrap(errdefs.ErrUnavailable, "no grpc connection available")
685	}
686	c.connMu.Unlock()
687	response, err := c.VersionService().Version(ctx, &ptypes.Empty{})
688	if err != nil {
689		return Version{}, err
690	}
691	return Version{
692		Version:  response.Version,
693		Revision: response.Revision,
694	}, nil
695}
696
697type ServerInfo struct {
698	UUID string
699}
700
701func (c *Client) Server(ctx context.Context) (ServerInfo, error) {
702	c.connMu.Lock()
703	if c.conn == nil {
704		c.connMu.Unlock()
705		return ServerInfo{}, errors.Wrap(errdefs.ErrUnavailable, "no grpc connection available")
706	}
707	c.connMu.Unlock()
708
709	response, err := c.IntrospectionService().Server(ctx, &types.Empty{})
710	if err != nil {
711		return ServerInfo{}, err
712	}
713	return ServerInfo{
714		UUID: response.UUID,
715	}, nil
716}
717
718func (c *Client) resolveSnapshotterName(ctx context.Context, name string) (string, error) {
719	if name == "" {
720		label, err := c.GetLabel(ctx, defaults.DefaultSnapshotterNSLabel)
721		if err != nil {
722			return "", err
723		}
724
725		if label != "" {
726			name = label
727		} else {
728			name = DefaultSnapshotter
729		}
730	}
731
732	return name, nil
733}
734
735func (c *Client) getSnapshotter(ctx context.Context, name string) (snapshots.Snapshotter, error) {
736	name, err := c.resolveSnapshotterName(ctx, name)
737	if err != nil {
738		return nil, err
739	}
740
741	s := c.SnapshotService(name)
742	if s == nil {
743		return nil, errors.Wrapf(errdefs.ErrNotFound, "snapshotter %s was not found", name)
744	}
745
746	return s, nil
747}
748
749// CheckRuntime returns true if the current runtime matches the expected
750// runtime. Providing various parts of the runtime schema will match those
751// parts of the expected runtime
752func CheckRuntime(current, expected string) bool {
753	cp := strings.Split(current, ".")
754	l := len(cp)
755	for i, p := range strings.Split(expected, ".") {
756		if i > l {
757			return false
758		}
759		if p != cp[i] {
760			return false
761		}
762	}
763	return true
764}
765