1package plugin // import "github.com/docker/docker/plugin"
2
3import (
4	"archive/tar"
5	"bytes"
6	"compress/gzip"
7	"context"
8	"encoding/json"
9	"io"
10	"io/ioutil"
11	"net/http"
12	"os"
13	"path"
14	"path/filepath"
15	"strings"
16	"time"
17
18	"github.com/containerd/containerd/content"
19	"github.com/containerd/containerd/images"
20	"github.com/containerd/containerd/platforms"
21	"github.com/containerd/containerd/remotes"
22	"github.com/containerd/containerd/remotes/docker"
23	"github.com/docker/distribution/manifest/schema2"
24	"github.com/docker/distribution/reference"
25	"github.com/docker/docker/api/types"
26	"github.com/docker/docker/api/types/filters"
27	"github.com/docker/docker/dockerversion"
28	"github.com/docker/docker/errdefs"
29	"github.com/docker/docker/pkg/authorization"
30	"github.com/docker/docker/pkg/chrootarchive"
31	"github.com/docker/docker/pkg/pools"
32	"github.com/docker/docker/pkg/progress"
33	"github.com/docker/docker/pkg/stringid"
34	"github.com/docker/docker/pkg/system"
35	v2 "github.com/docker/docker/plugin/v2"
36	"github.com/moby/sys/mount"
37	digest "github.com/opencontainers/go-digest"
38	specs "github.com/opencontainers/image-spec/specs-go/v1"
39	"github.com/pkg/errors"
40	"github.com/sirupsen/logrus"
41)
42
43var acceptedPluginFilterTags = map[string]bool{
44	"enabled":    true,
45	"capability": true,
46}
47
48// Disable deactivates a plugin. This means resources (volumes, networks) cant use them.
49func (pm *Manager) Disable(refOrID string, config *types.PluginDisableConfig) error {
50	p, err := pm.config.Store.GetV2Plugin(refOrID)
51	if err != nil {
52		return err
53	}
54	pm.mu.RLock()
55	c := pm.cMap[p]
56	pm.mu.RUnlock()
57
58	if !config.ForceDisable && p.GetRefCount() > 0 {
59		return errors.WithStack(inUseError(p.Name()))
60	}
61
62	for _, typ := range p.GetTypes() {
63		if typ.Capability == authorization.AuthZApiImplements {
64			pm.config.AuthzMiddleware.RemovePlugin(p.Name())
65		}
66	}
67
68	if err := pm.disable(p, c); err != nil {
69		return err
70	}
71	pm.publisher.Publish(EventDisable{Plugin: p.PluginObj})
72	pm.config.LogPluginEvent(p.GetID(), refOrID, "disable")
73	return nil
74}
75
76// Enable activates a plugin, which implies that they are ready to be used by containers.
77func (pm *Manager) Enable(refOrID string, config *types.PluginEnableConfig) error {
78	p, err := pm.config.Store.GetV2Plugin(refOrID)
79	if err != nil {
80		return err
81	}
82
83	c := &controller{timeoutInSecs: config.Timeout}
84	if err := pm.enable(p, c, false); err != nil {
85		return err
86	}
87	pm.publisher.Publish(EventEnable{Plugin: p.PluginObj})
88	pm.config.LogPluginEvent(p.GetID(), refOrID, "enable")
89	return nil
90}
91
92// Inspect examines a plugin config
93func (pm *Manager) Inspect(refOrID string) (tp *types.Plugin, err error) {
94	p, err := pm.config.Store.GetV2Plugin(refOrID)
95	if err != nil {
96		return nil, err
97	}
98
99	return &p.PluginObj, nil
100}
101
102func computePrivileges(c types.PluginConfig) types.PluginPrivileges {
103	var privileges types.PluginPrivileges
104	if c.Network.Type != "null" && c.Network.Type != "bridge" && c.Network.Type != "" {
105		privileges = append(privileges, types.PluginPrivilege{
106			Name:        "network",
107			Description: "permissions to access a network",
108			Value:       []string{c.Network.Type},
109		})
110	}
111	if c.IpcHost {
112		privileges = append(privileges, types.PluginPrivilege{
113			Name:        "host ipc namespace",
114			Description: "allow access to host ipc namespace",
115			Value:       []string{"true"},
116		})
117	}
118	if c.PidHost {
119		privileges = append(privileges, types.PluginPrivilege{
120			Name:        "host pid namespace",
121			Description: "allow access to host pid namespace",
122			Value:       []string{"true"},
123		})
124	}
125	for _, mount := range c.Mounts {
126		if mount.Source != nil {
127			privileges = append(privileges, types.PluginPrivilege{
128				Name:        "mount",
129				Description: "host path to mount",
130				Value:       []string{*mount.Source},
131			})
132		}
133	}
134	for _, device := range c.Linux.Devices {
135		if device.Path != nil {
136			privileges = append(privileges, types.PluginPrivilege{
137				Name:        "device",
138				Description: "host device to access",
139				Value:       []string{*device.Path},
140			})
141		}
142	}
143	if c.Linux.AllowAllDevices {
144		privileges = append(privileges, types.PluginPrivilege{
145			Name:        "allow-all-devices",
146			Description: "allow 'rwm' access to all devices",
147			Value:       []string{"true"},
148		})
149	}
150	if len(c.Linux.Capabilities) > 0 {
151		privileges = append(privileges, types.PluginPrivilege{
152			Name:        "capabilities",
153			Description: "list of additional capabilities required",
154			Value:       c.Linux.Capabilities,
155		})
156	}
157
158	return privileges
159}
160
161// Privileges pulls a plugin config and computes the privileges required to install it.
162func (pm *Manager) Privileges(ctx context.Context, ref reference.Named, metaHeader http.Header, authConfig *types.AuthConfig) (types.PluginPrivileges, error) {
163	var (
164		config     types.PluginConfig
165		configSeen bool
166	)
167
168	h := func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
169		switch desc.MediaType {
170		case schema2.MediaTypeManifest, specs.MediaTypeImageManifest:
171			data, err := content.ReadBlob(ctx, pm.blobStore, desc)
172			if err != nil {
173				return nil, errors.Wrapf(err, "error reading image manifest from blob store for %s", ref)
174			}
175
176			var m specs.Manifest
177			if err := json.Unmarshal(data, &m); err != nil {
178				return nil, errors.Wrapf(err, "error unmarshaling image manifest for %s", ref)
179			}
180			return []specs.Descriptor{m.Config}, nil
181		case schema2.MediaTypePluginConfig:
182			configSeen = true
183			data, err := content.ReadBlob(ctx, pm.blobStore, desc)
184			if err != nil {
185				return nil, errors.Wrapf(err, "error reading plugin config from blob store for %s", ref)
186			}
187
188			if err := json.Unmarshal(data, &config); err != nil {
189				return nil, errors.Wrapf(err, "error unmarshaling plugin config for %s", ref)
190			}
191		}
192
193		return nil, nil
194	}
195
196	if err := pm.fetch(ctx, ref, authConfig, progress.DiscardOutput(), metaHeader, images.HandlerFunc(h)); err != nil {
197		return types.PluginPrivileges{}, nil
198	}
199
200	if !configSeen {
201		return types.PluginPrivileges{}, errors.Errorf("did not find plugin config for specified reference %s", ref)
202	}
203
204	return computePrivileges(config), nil
205}
206
207// Upgrade upgrades a plugin
208//
209// TODO: replace reference package usage with simpler url.Parse semantics
210func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) {
211	p, err := pm.config.Store.GetV2Plugin(name)
212	if err != nil {
213		return err
214	}
215
216	if p.IsEnabled() {
217		return errors.Wrap(enabledError(p.Name()), "plugin must be disabled before upgrading")
218	}
219
220	// revalidate because Pull is public
221	if _, err := reference.ParseNormalizedNamed(name); err != nil {
222		return errors.Wrapf(errdefs.InvalidParameter(err), "failed to parse %q", name)
223	}
224
225	pm.muGC.RLock()
226	defer pm.muGC.RUnlock()
227
228	tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
229	if err != nil {
230		return errors.Wrap(err, "error creating tmp dir for plugin rootfs")
231	}
232
233	var md fetchMeta
234
235	ctx, cancel := context.WithCancel(ctx)
236	out, waitProgress := setupProgressOutput(outStream, cancel)
237	defer waitProgress()
238
239	if err := pm.fetch(ctx, ref, authConfig, out, metaHeader, storeFetchMetadata(&md), childrenHandler(pm.blobStore), applyLayer(pm.blobStore, tmpRootFSDir, out)); err != nil {
240		return err
241	}
242	pm.config.LogPluginEvent(reference.FamiliarString(ref), name, "pull")
243
244	if err := validateFetchedMetadata(md); err != nil {
245		return err
246	}
247
248	if err := pm.upgradePlugin(p, md.config, md.manifest, md.blobs, tmpRootFSDir, &privileges); err != nil {
249		return err
250	}
251	p.PluginObj.PluginReference = ref.String()
252	return nil
253}
254
255// Pull pulls a plugin, check if the correct privileges are provided and install the plugin.
256//
257// TODO: replace reference package usage with simpler url.Parse semantics
258func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer, opts ...CreateOpt) (err error) {
259	pm.muGC.RLock()
260	defer pm.muGC.RUnlock()
261
262	// revalidate because Pull is public
263	nameref, err := reference.ParseNormalizedNamed(name)
264	if err != nil {
265		return errors.Wrapf(errdefs.InvalidParameter(err), "failed to parse %q", name)
266	}
267	name = reference.FamiliarString(reference.TagNameOnly(nameref))
268
269	if err := pm.config.Store.validateName(name); err != nil {
270		return errdefs.InvalidParameter(err)
271	}
272
273	tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
274	if err != nil {
275		return errors.Wrap(errdefs.System(err), "error preparing upgrade")
276	}
277	defer os.RemoveAll(tmpRootFSDir)
278
279	var md fetchMeta
280
281	ctx, cancel := context.WithCancel(ctx)
282	out, waitProgress := setupProgressOutput(outStream, cancel)
283	defer waitProgress()
284
285	if err := pm.fetch(ctx, ref, authConfig, out, metaHeader, storeFetchMetadata(&md), childrenHandler(pm.blobStore), applyLayer(pm.blobStore, tmpRootFSDir, out)); err != nil {
286		return err
287	}
288	pm.config.LogPluginEvent(reference.FamiliarString(ref), name, "pull")
289
290	if err := validateFetchedMetadata(md); err != nil {
291		return err
292	}
293
294	refOpt := func(p *v2.Plugin) {
295		p.PluginObj.PluginReference = ref.String()
296	}
297	optsList := make([]CreateOpt, 0, len(opts)+1)
298	optsList = append(optsList, opts...)
299	optsList = append(optsList, refOpt)
300
301	// TODO: tmpRootFSDir is empty but should have layers in it
302	p, err := pm.createPlugin(name, md.config, md.manifest, md.blobs, tmpRootFSDir, &privileges, optsList...)
303	if err != nil {
304		return err
305	}
306
307	pm.publisher.Publish(EventCreate{Plugin: p.PluginObj})
308
309	return nil
310}
311
312// List displays the list of plugins and associated metadata.
313func (pm *Manager) List(pluginFilters filters.Args) ([]types.Plugin, error) {
314	if err := pluginFilters.Validate(acceptedPluginFilterTags); err != nil {
315		return nil, err
316	}
317
318	enabledOnly := false
319	disabledOnly := false
320	if pluginFilters.Contains("enabled") {
321		if pluginFilters.ExactMatch("enabled", "true") {
322			enabledOnly = true
323		} else if pluginFilters.ExactMatch("enabled", "false") {
324			disabledOnly = true
325		} else {
326			return nil, invalidFilter{"enabled", pluginFilters.Get("enabled")}
327		}
328	}
329
330	plugins := pm.config.Store.GetAll()
331	out := make([]types.Plugin, 0, len(plugins))
332
333next:
334	for _, p := range plugins {
335		if enabledOnly && !p.PluginObj.Enabled {
336			continue
337		}
338		if disabledOnly && p.PluginObj.Enabled {
339			continue
340		}
341		if pluginFilters.Contains("capability") {
342			for _, f := range p.GetTypes() {
343				if !pluginFilters.Match("capability", f.Capability) {
344					continue next
345				}
346			}
347		}
348		out = append(out, p.PluginObj)
349	}
350	return out, nil
351}
352
353// Push pushes a plugin to the registry.
354func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header, authConfig *types.AuthConfig, outStream io.Writer) error {
355	p, err := pm.config.Store.GetV2Plugin(name)
356	if err != nil {
357		return err
358	}
359
360	ref, err := reference.ParseNormalizedNamed(p.Name())
361	if err != nil {
362		return errors.Wrapf(err, "plugin has invalid name %v for push", p.Name())
363	}
364
365	statusTracker := docker.NewInMemoryTracker()
366
367	resolver, err := pm.newResolver(ctx, statusTracker, authConfig, metaHeader, false)
368	if err != nil {
369		return err
370	}
371
372	pusher, err := resolver.Pusher(ctx, ref.String())
373	if err != nil {
374
375		return errors.Wrap(err, "error creating plugin pusher")
376	}
377
378	pj := newPushJobs(statusTracker)
379
380	ctx, cancel := context.WithCancel(ctx)
381	out, waitProgress := setupProgressOutput(outStream, cancel)
382	defer waitProgress()
383
384	progressHandler := images.HandlerFunc(func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) {
385		logrus.WithField("mediaType", desc.MediaType).WithField("digest", desc.Digest.String()).Debug("Preparing to push plugin layer")
386		id := stringid.TruncateID(desc.Digest.String())
387		pj.add(remotes.MakeRefKey(ctx, desc), id)
388		progress.Update(out, id, "Preparing")
389		return nil, nil
390	})
391
392	desc, err := pm.getManifestDescriptor(ctx, p)
393	if err != nil {
394		return errors.Wrap(err, "error reading plugin manifest")
395	}
396
397	progress.Messagef(out, "", "The push refers to repository [%s]", reference.FamiliarName(ref))
398
399	// TODO: If a layer already exists on the registry, the progress output just says "Preparing"
400	go func() {
401		timer := time.NewTimer(100 * time.Millisecond)
402		defer timer.Stop()
403		if !timer.Stop() {
404			<-timer.C
405		}
406		var statuses []contentStatus
407		for {
408			timer.Reset(100 * time.Millisecond)
409			select {
410			case <-ctx.Done():
411				return
412			case <-timer.C:
413				statuses = pj.status()
414			}
415
416			for _, s := range statuses {
417				out.WriteProgress(progress.Progress{ID: s.Ref, Current: s.Offset, Total: s.Total, Action: s.Status, LastUpdate: s.Offset == s.Total})
418			}
419		}
420	}()
421
422	// Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo.
423	ctx = docker.WithScope(ctx, scope(ref, true))
424	if err := remotes.PushContent(ctx, pusher, desc, pm.blobStore, nil, func(h images.Handler) images.Handler {
425		return images.Handlers(progressHandler, h)
426	}); err != nil {
427		// Try fallback to http.
428		// This is needed because the containerd pusher will only attempt the first registry config we pass, which would
429		// typically be https.
430		// If there are no http-only host configs found we'll error out anyway.
431		resolver, _ := pm.newResolver(ctx, statusTracker, authConfig, metaHeader, true)
432		if resolver != nil {
433			pusher, _ := resolver.Pusher(ctx, ref.String())
434			if pusher != nil {
435				logrus.WithField("ref", ref).Debug("Re-attmpting push with http-fallback")
436				err2 := remotes.PushContent(ctx, pusher, desc, pm.blobStore, nil, func(h images.Handler) images.Handler {
437					return images.Handlers(progressHandler, h)
438				})
439				if err2 == nil {
440					err = nil
441				} else {
442					logrus.WithError(err2).WithField("ref", ref).Debug("Error while attempting push with http-fallback")
443				}
444			}
445		}
446		if err != nil {
447			return errors.Wrap(err, "error pushing plugin")
448		}
449	}
450
451	// For blobs that already exist in the registry we need to make sure to update the progress otherwise it will just say "pending"
452	// TODO: How to check if the layer already exists? Is it worth it?
453	for _, j := range pj.jobs {
454		progress.Update(out, pj.names[j], "Upload complete")
455	}
456
457	// Signal the client for content trust verification
458	progress.Aux(out, types.PushResult{Tag: ref.(reference.Tagged).Tag(), Digest: desc.Digest.String(), Size: int(desc.Size)})
459
460	return nil
461}
462
463// manifest wraps an OCI manifest, because...
464// Historically the registry does not support plugins unless the media type on the manifest is specifically schema2.MediaTypeManifest
465// So the OCI manifest media type is not supported.
466// Additionally, there is extra validation for the docker schema2 manifest than there is a mediatype set on the manifest itself
467// even though this is set on the descriptor
468// The OCI types do not have this field.
469type manifest struct {
470	specs.Manifest
471	MediaType string `json:"mediaType,omitempty"`
472}
473
474func buildManifest(ctx context.Context, s content.Manager, config digest.Digest, layers []digest.Digest) (manifest, error) {
475	var m manifest
476	m.MediaType = images.MediaTypeDockerSchema2Manifest
477	m.SchemaVersion = 2
478
479	configInfo, err := s.Info(ctx, config)
480	if err != nil {
481		return m, errors.Wrapf(err, "error reading plugin config content for digest %s", config)
482	}
483	m.Config = specs.Descriptor{
484		MediaType: mediaTypePluginConfig,
485		Size:      configInfo.Size,
486		Digest:    configInfo.Digest,
487	}
488
489	for _, l := range layers {
490		info, err := s.Info(ctx, l)
491		if err != nil {
492			return m, errors.Wrapf(err, "error fetching info for content digest %s", l)
493		}
494		m.Layers = append(m.Layers, specs.Descriptor{
495			MediaType: images.MediaTypeDockerSchema2LayerGzip, // TODO: This is assuming everything is a gzip compressed layer, but that may not be true.
496			Digest:    l,
497			Size:      info.Size,
498		})
499	}
500	return m, nil
501}
502
503// getManifestDescriptor gets the OCI descriptor for a manifest
504// It will generate a manifest if one does not exist
505func (pm *Manager) getManifestDescriptor(ctx context.Context, p *v2.Plugin) (specs.Descriptor, error) {
506	logger := logrus.WithField("plugin", p.Name()).WithField("digest", p.Manifest)
507	if p.Manifest != "" {
508		info, err := pm.blobStore.Info(ctx, p.Manifest)
509		if err == nil {
510			desc := specs.Descriptor{
511				Size:      info.Size,
512				Digest:    info.Digest,
513				MediaType: images.MediaTypeDockerSchema2Manifest,
514			}
515			return desc, nil
516		}
517		logger.WithError(err).Debug("Could not find plugin manifest in content store")
518	} else {
519		logger.Info("Plugin does not have manifest digest")
520	}
521	logger.Info("Building a new plugin manifest")
522
523	manifest, err := buildManifest(ctx, pm.blobStore, p.Config, p.Blobsums)
524	if err != nil {
525		return specs.Descriptor{}, err
526	}
527
528	desc, err := writeManifest(ctx, pm.blobStore, &manifest)
529	if err != nil {
530		return desc, err
531	}
532
533	if err := pm.save(p); err != nil {
534		logger.WithError(err).Error("Could not save plugin with manifest digest")
535	}
536	return desc, nil
537}
538
539func writeManifest(ctx context.Context, cs content.Store, m *manifest) (specs.Descriptor, error) {
540	platform := platforms.DefaultSpec()
541	desc := specs.Descriptor{
542		MediaType: images.MediaTypeDockerSchema2Manifest,
543		Platform:  &platform,
544	}
545	data, err := json.Marshal(m)
546	if err != nil {
547		return desc, errors.Wrap(err, "error encoding manifest")
548	}
549	desc.Digest = digest.FromBytes(data)
550	desc.Size = int64(len(data))
551
552	if err := content.WriteBlob(ctx, cs, remotes.MakeRefKey(ctx, desc), bytes.NewReader(data), desc); err != nil {
553		return desc, errors.Wrap(err, "error writing plugin manifest")
554	}
555	return desc, nil
556}
557
558// Remove deletes plugin's root directory.
559func (pm *Manager) Remove(name string, config *types.PluginRmConfig) error {
560	p, err := pm.config.Store.GetV2Plugin(name)
561	pm.mu.RLock()
562	c := pm.cMap[p]
563	pm.mu.RUnlock()
564
565	if err != nil {
566		return err
567	}
568
569	if !config.ForceRemove {
570		if p.GetRefCount() > 0 {
571			return inUseError(p.Name())
572		}
573		if p.IsEnabled() {
574			return enabledError(p.Name())
575		}
576	}
577
578	if p.IsEnabled() {
579		if err := pm.disable(p, c); err != nil {
580			logrus.Errorf("failed to disable plugin '%s': %s", p.Name(), err)
581		}
582	}
583
584	defer func() {
585		go pm.GC()
586	}()
587
588	id := p.GetID()
589	pluginDir := filepath.Join(pm.config.Root, id)
590
591	if err := mount.RecursiveUnmount(pluginDir); err != nil {
592		return errors.Wrap(err, "error unmounting plugin data")
593	}
594
595	if err := atomicRemoveAll(pluginDir); err != nil {
596		return err
597	}
598
599	pm.config.Store.Remove(p)
600	pm.config.LogPluginEvent(id, name, "remove")
601	pm.publisher.Publish(EventRemove{Plugin: p.PluginObj})
602	return nil
603}
604
605// Set sets plugin args
606func (pm *Manager) Set(name string, args []string) error {
607	p, err := pm.config.Store.GetV2Plugin(name)
608	if err != nil {
609		return err
610	}
611	if err := p.Set(args); err != nil {
612		return err
613	}
614	return pm.save(p)
615}
616
617// CreateFromContext creates a plugin from the given pluginDir which contains
618// both the rootfs and the config.json and a repoName with optional tag.
619func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, options *types.PluginCreateOptions) (err error) {
620	pm.muGC.RLock()
621	defer pm.muGC.RUnlock()
622
623	ref, err := reference.ParseNormalizedNamed(options.RepoName)
624	if err != nil {
625		return errors.Wrapf(err, "failed to parse reference %v", options.RepoName)
626	}
627	if _, ok := ref.(reference.Canonical); ok {
628		return errors.Errorf("canonical references are not permitted")
629	}
630	name := reference.FamiliarString(reference.TagNameOnly(ref))
631
632	if err := pm.config.Store.validateName(name); err != nil { // fast check, real check is in createPlugin()
633		return err
634	}
635
636	tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
637	if err != nil {
638		return errors.Wrap(err, "failed to create temp directory")
639	}
640	defer os.RemoveAll(tmpRootFSDir)
641
642	var configJSON []byte
643	rootFS := splitConfigRootFSFromTar(tarCtx, &configJSON)
644
645	rootFSBlob, err := pm.blobStore.Writer(ctx, content.WithRef(name))
646	if err != nil {
647		return err
648	}
649	defer rootFSBlob.Close()
650
651	gzw := gzip.NewWriter(rootFSBlob)
652	rootFSReader := io.TeeReader(rootFS, gzw)
653
654	if err := chrootarchive.Untar(rootFSReader, tmpRootFSDir, nil); err != nil {
655		return err
656	}
657	if err := rootFS.Close(); err != nil {
658		return err
659	}
660
661	if configJSON == nil {
662		return errors.New("config not found")
663	}
664
665	if err := gzw.Close(); err != nil {
666		return errors.Wrap(err, "error closing gzip writer")
667	}
668
669	var config types.PluginConfig
670	if err := json.Unmarshal(configJSON, &config); err != nil {
671		return errors.Wrap(err, "failed to parse config")
672	}
673
674	if err := pm.validateConfig(config); err != nil {
675		return err
676	}
677
678	pm.mu.Lock()
679	defer pm.mu.Unlock()
680
681	if err := rootFSBlob.Commit(ctx, 0, ""); err != nil {
682		return err
683	}
684	defer func() {
685		if err != nil {
686			go pm.GC()
687		}
688	}()
689
690	config.Rootfs = &types.PluginConfigRootfs{
691		Type:    "layers",
692		DiffIds: []string{rootFSBlob.Digest().String()},
693	}
694
695	config.DockerVersion = dockerversion.Version
696
697	configBlob, err := pm.blobStore.Writer(ctx, content.WithRef(name+"-config.json"))
698	if err != nil {
699		return err
700	}
701	defer configBlob.Close()
702	if err := json.NewEncoder(configBlob).Encode(config); err != nil {
703		return errors.Wrap(err, "error encoding json config")
704	}
705	if err := configBlob.Commit(ctx, 0, ""); err != nil {
706		return err
707	}
708
709	configDigest := configBlob.Digest()
710	layers := []digest.Digest{rootFSBlob.Digest()}
711
712	manifest, err := buildManifest(ctx, pm.blobStore, configDigest, layers)
713	if err != nil {
714		return err
715	}
716	desc, err := writeManifest(ctx, pm.blobStore, &manifest)
717	if err != nil {
718		return
719	}
720
721	p, err := pm.createPlugin(name, configDigest, desc.Digest, layers, tmpRootFSDir, nil)
722	if err != nil {
723		return err
724	}
725	p.PluginObj.PluginReference = name
726
727	pm.publisher.Publish(EventCreate{Plugin: p.PluginObj})
728	pm.config.LogPluginEvent(p.PluginObj.ID, name, "create")
729
730	return nil
731}
732
733func (pm *Manager) validateConfig(config types.PluginConfig) error {
734	return nil // TODO:
735}
736
737func splitConfigRootFSFromTar(in io.ReadCloser, config *[]byte) io.ReadCloser {
738	pr, pw := io.Pipe()
739	go func() {
740		tarReader := tar.NewReader(in)
741		tarWriter := tar.NewWriter(pw)
742		defer in.Close()
743
744		hasRootFS := false
745
746		for {
747			hdr, err := tarReader.Next()
748			if err == io.EOF {
749				if !hasRootFS {
750					pw.CloseWithError(errors.Wrap(err, "no rootfs found"))
751					return
752				}
753				// Signals end of archive.
754				tarWriter.Close()
755				pw.Close()
756				return
757			}
758			if err != nil {
759				pw.CloseWithError(errors.Wrap(err, "failed to read from tar"))
760				return
761			}
762
763			content := io.Reader(tarReader)
764			name := path.Clean(hdr.Name)
765			if path.IsAbs(name) {
766				name = name[1:]
767			}
768			if name == configFileName {
769				dt, err := ioutil.ReadAll(content)
770				if err != nil {
771					pw.CloseWithError(errors.Wrapf(err, "failed to read %s", configFileName))
772					return
773				}
774				*config = dt
775			}
776			if parts := strings.Split(name, "/"); len(parts) != 0 && parts[0] == rootFSFileName {
777				hdr.Name = path.Clean(path.Join(parts[1:]...))
778				if hdr.Typeflag == tar.TypeLink && strings.HasPrefix(strings.ToLower(hdr.Linkname), rootFSFileName+"/") {
779					hdr.Linkname = hdr.Linkname[len(rootFSFileName)+1:]
780				}
781				if err := tarWriter.WriteHeader(hdr); err != nil {
782					pw.CloseWithError(errors.Wrap(err, "error writing tar header"))
783					return
784				}
785				if _, err := pools.Copy(tarWriter, content); err != nil {
786					pw.CloseWithError(errors.Wrap(err, "error copying tar data"))
787					return
788				}
789				hasRootFS = true
790			} else {
791				io.Copy(ioutil.Discard, content)
792			}
793		}
794	}()
795	return pr
796}
797
798func atomicRemoveAll(dir string) error {
799	renamed := dir + "-removing"
800
801	err := os.Rename(dir, renamed)
802	switch {
803	case os.IsNotExist(err), err == nil:
804		// even if `dir` doesn't exist, we can still try and remove `renamed`
805	case os.IsExist(err):
806		// Some previous remove failed, check if the origin dir exists
807		if e := system.EnsureRemoveAll(renamed); e != nil {
808			return errors.Wrap(err, "rename target already exists and could not be removed")
809		}
810		if _, err := os.Stat(dir); os.IsNotExist(err) {
811			// origin doesn't exist, nothing left to do
812			return nil
813		}
814
815		// attempt to rename again
816		if err := os.Rename(dir, renamed); err != nil {
817			return errors.Wrap(err, "failed to rename dir for atomic removal")
818		}
819	default:
820		return errors.Wrap(err, "failed to rename dir for atomic removal")
821	}
822
823	if err := system.EnsureRemoveAll(renamed); err != nil {
824		os.Rename(renamed, dir)
825		return err
826	}
827	return nil
828}
829