1package container // import "github.com/docker/docker/daemon/cluster/executor/container"
2
3import (
4	"context"
5	"encoding/base64"
6	"encoding/json"
7	"fmt"
8	"io"
9	"os"
10	"strings"
11	"syscall"
12	"time"
13
14	"github.com/docker/distribution/reference"
15	"github.com/docker/docker/api/types"
16	"github.com/docker/docker/api/types/backend"
17	containertypes "github.com/docker/docker/api/types/container"
18	"github.com/docker/docker/api/types/events"
19	containerpkg "github.com/docker/docker/container"
20	"github.com/docker/docker/daemon"
21	"github.com/docker/docker/daemon/cluster/convert"
22	executorpkg "github.com/docker/docker/daemon/cluster/executor"
23	volumeopts "github.com/docker/docker/volume/service/opts"
24	"github.com/docker/libnetwork"
25	"github.com/docker/swarmkit/agent/exec"
26	"github.com/docker/swarmkit/api"
27	"github.com/docker/swarmkit/log"
28	gogotypes "github.com/gogo/protobuf/types"
29	digest "github.com/opencontainers/go-digest"
30	"github.com/pkg/errors"
31	"github.com/sirupsen/logrus"
32	"golang.org/x/time/rate"
33)
34
35// nodeAttachmentReadyInterval is the interval to poll
36const nodeAttachmentReadyInterval = 100 * time.Millisecond
37
38// containerAdapter conducts remote operations for a container. All calls
39// are mostly naked calls to the client API, seeded with information from
40// containerConfig.
41type containerAdapter struct {
42	backend       executorpkg.Backend
43	imageBackend  executorpkg.ImageBackend
44	volumeBackend executorpkg.VolumeBackend
45	container     *containerConfig
46	dependencies  exec.DependencyGetter
47}
48
49func newContainerAdapter(b executorpkg.Backend, i executorpkg.ImageBackend, v executorpkg.VolumeBackend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*containerAdapter, error) {
50	ctnr, err := newContainerConfig(task, node)
51	if err != nil {
52		return nil, err
53	}
54
55	return &containerAdapter{
56		container:     ctnr,
57		backend:       b,
58		imageBackend:  i,
59		volumeBackend: v,
60		dependencies:  dependencies,
61	}, nil
62}
63
64func (c *containerAdapter) pullImage(ctx context.Context) error {
65	spec := c.container.spec()
66
67	// Skip pulling if the image is referenced by image ID.
68	if _, err := digest.Parse(spec.Image); err == nil {
69		return nil
70	}
71
72	// Skip pulling if the image is referenced by digest and already
73	// exists locally.
74	named, err := reference.ParseNormalizedNamed(spec.Image)
75	if err == nil {
76		if _, ok := named.(reference.Canonical); ok {
77			_, err := c.imageBackend.LookupImage(spec.Image)
78			if err == nil {
79				return nil
80			}
81		}
82	}
83
84	// if the image needs to be pulled, the auth config will be retrieved and updated
85	var encodedAuthConfig string
86	if spec.PullOptions != nil {
87		encodedAuthConfig = spec.PullOptions.RegistryAuth
88	}
89
90	authConfig := &types.AuthConfig{}
91	if encodedAuthConfig != "" {
92		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
93			logrus.Warnf("invalid authconfig: %v", err)
94		}
95	}
96
97	pr, pw := io.Pipe()
98	metaHeaders := map[string][]string{}
99	go func() {
100		// TODO LCOW Support: This will need revisiting as
101		// the stack is built up to include LCOW support for swarm.
102		err := c.imageBackend.PullImage(ctx, c.container.image(), "", nil, metaHeaders, authConfig, pw)
103		pw.CloseWithError(err)
104	}()
105
106	dec := json.NewDecoder(pr)
107	dec.UseNumber()
108	m := map[string]interface{}{}
109	spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)
110
111	lastStatus := ""
112	for {
113		if err := dec.Decode(&m); err != nil {
114			if err == io.EOF {
115				break
116			}
117			return err
118		}
119		l := log.G(ctx)
120		// limit pull progress logs unless the status changes
121		if spamLimiter.Allow() || lastStatus != m["status"] {
122			// if we have progress details, we have everything we need
123			if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
124				// first, log the image and status
125				l = l.WithFields(logrus.Fields{
126					"image":  c.container.image(),
127					"status": m["status"],
128				})
129				// then, if we have progress, log the progress
130				if progress["current"] != nil && progress["total"] != nil {
131					l = l.WithFields(logrus.Fields{
132						"current": progress["current"],
133						"total":   progress["total"],
134					})
135				}
136			}
137			l.Debug("pull in progress")
138		}
139		// sometimes, we get no useful information at all, and add no fields
140		if status, ok := m["status"].(string); ok {
141			lastStatus = status
142		}
143	}
144
145	// if the final stream object contained an error, return it
146	if errMsg, ok := m["error"]; ok {
147		return fmt.Errorf("%v", errMsg)
148	}
149	return nil
150}
151
152// waitNodeAttachments validates that NetworkAttachments exist on this node
153// for every network in use by this task. It blocks until the network
154// attachments are ready, or the context times out. If it returns nil, then the
155// node's network attachments are all there.
156func (c *containerAdapter) waitNodeAttachments(ctx context.Context) error {
157	// to do this, we're going to get the attachment store and try getting the
158	// IP address for each network. if any network comes back not existing,
159	// we'll wait and try again.
160	attachmentStore := c.backend.GetAttachmentStore()
161	if attachmentStore == nil {
162		return fmt.Errorf("error getting attachment store")
163	}
164
165	// essentially, we're long-polling here. this is really sub-optimal, but a
166	// better solution based off signaling channels would require a more
167	// substantial rearchitecture and probably not be worth our time in terms
168	// of performance gains.
169	poll := time.NewTicker(nodeAttachmentReadyInterval)
170	defer poll.Stop()
171	for {
172		// set a flag ready to true. if we try to get a network IP that doesn't
173		// exist yet, we will set this flag to "false"
174		ready := true
175		for _, attachment := range c.container.networksAttachments {
176			// we only need node attachments (IP address) for overlay networks
177			// TODO(dperny): unsure if this will work with other network
178			// drivers, but i also don't think other network drivers use the
179			// node attachment IP address.
180			if attachment.Network.DriverState.Name == "overlay" {
181				if _, exists := attachmentStore.GetIPForNetwork(attachment.Network.ID); !exists {
182					ready = false
183				}
184			}
185		}
186
187		// if everything is ready here, then we can just return no error
188		if ready {
189			return nil
190		}
191
192		// otherwise, try polling again, or wait for context canceled.
193		select {
194		case <-ctx.Done():
195			return fmt.Errorf("node is missing network attachments, ip addresses may be exhausted")
196		case <-poll.C:
197		}
198	}
199}
200
201func (c *containerAdapter) createNetworks(ctx context.Context) error {
202	for name := range c.container.networksAttachments {
203		ncr, err := c.container.networkCreateRequest(name)
204		if err != nil {
205			return err
206		}
207
208		if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
209			if _, ok := err.(libnetwork.NetworkNameError); ok {
210				continue
211			}
212			// We will continue if CreateManagedNetwork returns PredefinedNetworkError error.
213			// Other callers still can treat it as Error.
214			if _, ok := err.(daemon.PredefinedNetworkError); ok {
215				continue
216			}
217			return err
218		}
219	}
220
221	return nil
222}
223
224func (c *containerAdapter) removeNetworks(ctx context.Context) error {
225	var (
226		activeEndpointsError *libnetwork.ActiveEndpointsError
227		errNoSuchNetwork     libnetwork.ErrNoSuchNetwork
228	)
229
230	for name, v := range c.container.networksAttachments {
231		if err := c.backend.DeleteManagedNetwork(v.Network.ID); err != nil {
232			switch {
233			case errors.As(err, &activeEndpointsError):
234				continue
235			case errors.As(err, &errNoSuchNetwork):
236				continue
237			default:
238				log.G(ctx).Errorf("network %s remove failed: %v", name, err)
239				return err
240			}
241		}
242	}
243
244	return nil
245}
246
247func (c *containerAdapter) networkAttach(ctx context.Context) error {
248	config := c.container.createNetworkingConfig(c.backend)
249
250	var (
251		networkName string
252		networkID   string
253	)
254
255	if config != nil {
256		for n, epConfig := range config.EndpointsConfig {
257			networkName = n
258			networkID = epConfig.NetworkID
259			break
260		}
261	}
262
263	return c.backend.UpdateAttachment(networkName, networkID, c.container.networkAttachmentContainerID(), config)
264}
265
266func (c *containerAdapter) waitForDetach(ctx context.Context) error {
267	config := c.container.createNetworkingConfig(c.backend)
268
269	var (
270		networkName string
271		networkID   string
272	)
273
274	if config != nil {
275		for n, epConfig := range config.EndpointsConfig {
276			networkName = n
277			networkID = epConfig.NetworkID
278			break
279		}
280	}
281
282	return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.networkAttachmentContainerID())
283}
284
285func (c *containerAdapter) create(ctx context.Context) error {
286	var cr containertypes.ContainerCreateCreatedBody
287	var err error
288	if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{
289		Name:       c.container.name(),
290		Config:     c.container.config(),
291		HostConfig: c.container.hostConfig(),
292		// Use the first network in container create
293		NetworkingConfig: c.container.createNetworkingConfig(c.backend),
294	}); err != nil {
295		return err
296	}
297
298	// Docker daemon currently doesn't support multiple networks in container create
299	// Connect to all other networks
300	nc := c.container.connectNetworkingConfig(c.backend)
301
302	if nc != nil {
303		for n, ep := range nc.EndpointsConfig {
304			if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
305				return err
306			}
307		}
308	}
309
310	container := c.container.task.Spec.GetContainer()
311	if container == nil {
312		return errors.New("unable to get container from task spec")
313	}
314
315	if err := c.backend.SetContainerDependencyStore(cr.ID, c.dependencies); err != nil {
316		return err
317	}
318
319	// configure secrets
320	secretRefs := convert.SecretReferencesFromGRPC(container.Secrets)
321	if err := c.backend.SetContainerSecretReferences(cr.ID, secretRefs); err != nil {
322		return err
323	}
324
325	configRefs := convert.ConfigReferencesFromGRPC(container.Configs)
326	if err := c.backend.SetContainerConfigReferences(cr.ID, configRefs); err != nil {
327		return err
328	}
329
330	return c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig())
331}
332
333// checkMounts ensures that the provided mounts won't have any host-specific
334// problems at start up. For example, we disallow bind mounts without an
335// existing path, which slightly different from the container API.
336func (c *containerAdapter) checkMounts() error {
337	spec := c.container.spec()
338	for _, mount := range spec.Mounts {
339		switch mount.Type {
340		case api.MountTypeBind:
341			if _, err := os.Stat(mount.Source); os.IsNotExist(err) {
342				return fmt.Errorf("invalid bind mount source, source path not found: %s", mount.Source)
343			}
344		}
345	}
346
347	return nil
348}
349
350func (c *containerAdapter) start(ctx context.Context) error {
351	if err := c.checkMounts(); err != nil {
352		return err
353	}
354
355	return c.backend.ContainerStart(c.container.name(), nil, "", "")
356}
357
358func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
359	cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
360	if ctx.Err() != nil {
361		return types.ContainerJSON{}, ctx.Err()
362	}
363	if err != nil {
364		return types.ContainerJSON{}, err
365	}
366	return *cs, nil
367}
368
369// events issues a call to the events API and returns a channel with all
370// events. The stream of events can be shutdown by cancelling the context.
371func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
372	log.G(ctx).Debugf("waiting on events")
373	buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
374	eventsq := make(chan events.Message, len(buffer))
375
376	for _, event := range buffer {
377		eventsq <- event
378	}
379
380	go func() {
381		defer c.backend.UnsubscribeFromEvents(l)
382
383		for {
384			select {
385			case ev := <-l:
386				jev, ok := ev.(events.Message)
387				if !ok {
388					log.G(ctx).Warnf("unexpected event message: %q", ev)
389					continue
390				}
391				select {
392				case eventsq <- jev:
393				case <-ctx.Done():
394					return
395				}
396			case <-ctx.Done():
397				return
398			}
399		}
400	}()
401
402	return eventsq
403}
404
405func (c *containerAdapter) wait(ctx context.Context) (<-chan containerpkg.StateStatus, error) {
406	return c.backend.ContainerWait(ctx, c.container.nameOrID(), containerpkg.WaitConditionNotRunning)
407}
408
409func (c *containerAdapter) shutdown(ctx context.Context) error {
410	// Default stop grace period to nil (daemon will use the stopTimeout of the container)
411	var stopgrace *int
412	spec := c.container.spec()
413	if spec.StopGracePeriod != nil {
414		stopgraceValue := int(spec.StopGracePeriod.Seconds)
415		stopgrace = &stopgraceValue
416	}
417	return c.backend.ContainerStop(c.container.name(), stopgrace)
418}
419
420func (c *containerAdapter) terminate(ctx context.Context) error {
421	return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
422}
423
424func (c *containerAdapter) remove(ctx context.Context) error {
425	return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
426		RemoveVolume: true,
427		ForceRemove:  true,
428	})
429}
430
431func (c *containerAdapter) createVolumes(ctx context.Context) error {
432	// Create plugin volumes that are embedded inside a Mount
433	for _, mount := range c.container.task.Spec.GetContainer().Mounts {
434		if mount.Type != api.MountTypeVolume {
435			continue
436		}
437
438		if mount.VolumeOptions == nil {
439			continue
440		}
441
442		if mount.VolumeOptions.DriverConfig == nil {
443			continue
444		}
445
446		req := c.container.volumeCreateRequest(&mount)
447
448		// Check if this volume exists on the engine
449		if _, err := c.volumeBackend.Create(ctx, req.Name, req.Driver,
450			volumeopts.WithCreateOptions(req.DriverOpts),
451			volumeopts.WithCreateLabels(req.Labels),
452		); err != nil {
453			// TODO(amitshukla): Today, volume create through the engine api does not return an error
454			// when the named volume with the same parameters already exists.
455			// It returns an error if the driver name is different - that is a valid error
456			return err
457		}
458
459	}
460
461	return nil
462}
463
464func (c *containerAdapter) activateServiceBinding() error {
465	return c.backend.ActivateContainerServiceBinding(c.container.name())
466}
467
468func (c *containerAdapter) deactivateServiceBinding() error {
469	return c.backend.DeactivateContainerServiceBinding(c.container.name())
470}
471
472func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) {
473	apiOptions := &types.ContainerLogsOptions{
474		Follow: options.Follow,
475
476		// Always say yes to Timestamps and Details. we make the decision
477		// of whether to return these to the user or not way higher up the
478		// stack.
479		Timestamps: true,
480		Details:    true,
481	}
482
483	if options.Since != nil {
484		since, err := gogotypes.TimestampFromProto(options.Since)
485		if err != nil {
486			return nil, err
487		}
488		// print since as this formatted string because the docker container
489		// logs interface expects it like this.
490		// see github.com/docker/docker/api/types/time.ParseTimestamps
491		apiOptions.Since = fmt.Sprintf("%d.%09d", since.Unix(), int64(since.Nanosecond()))
492	}
493
494	if options.Tail < 0 {
495		// See protobuf documentation for details of how this works.
496		apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
497	} else if options.Tail > 0 {
498		return nil, errors.New("tail relative to start of logs not supported via docker API")
499	}
500
501	if len(options.Streams) == 0 {
502		// empty == all
503		apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
504	} else {
505		for _, stream := range options.Streams {
506			switch stream {
507			case api.LogStreamStdout:
508				apiOptions.ShowStdout = true
509			case api.LogStreamStderr:
510				apiOptions.ShowStderr = true
511			}
512		}
513	}
514	msgs, _, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions)
515	if err != nil {
516		return nil, err
517	}
518	return msgs, nil
519}
520
521// todo: typed/wrapped errors
522func isContainerCreateNameConflict(err error) bool {
523	return strings.Contains(err.Error(), "Conflict. The name")
524}
525
526func isUnknownContainer(err error) bool {
527	return strings.Contains(err.Error(), "No such container:")
528}
529
530func isStoppedContainer(err error) bool {
531	return strings.Contains(err.Error(), "is already stopped")
532}
533