1package container
2
3import (
4	"encoding/base64"
5	"encoding/json"
6	"fmt"
7	"io"
8	"strings"
9	"syscall"
10	"time"
11
12	"github.com/sirupsen/logrus"
13	"github.com/docker/distribution/digest"
14	"github.com/docker/docker/api/types"
15	"github.com/docker/docker/api/types/backend"
16	containertypes "github.com/docker/docker/api/types/container"
17	"github.com/docker/docker/api/types/events"
18	"github.com/docker/docker/daemon/cluster/convert"
19	executorpkg "github.com/docker/docker/daemon/cluster/executor"
20	"github.com/docker/docker/reference"
21	"github.com/docker/libnetwork"
22	"github.com/docker/swarmkit/agent/exec"
23	"github.com/docker/swarmkit/api"
24	"github.com/docker/swarmkit/log"
25	"github.com/docker/swarmkit/protobuf/ptypes"
26	"golang.org/x/net/context"
27	"golang.org/x/time/rate"
28)
29
30// containerAdapter conducts remote operations for a container. All calls
31// are mostly naked calls to the client API, seeded with information from
32// containerConfig.
33type containerAdapter struct {
34	backend   executorpkg.Backend
35	container *containerConfig
36	secrets   exec.SecretGetter
37}
38
39func newContainerAdapter(b executorpkg.Backend, task *api.Task, secrets exec.SecretGetter) (*containerAdapter, error) {
40	ctnr, err := newContainerConfig(task)
41	if err != nil {
42		return nil, err
43	}
44
45	return &containerAdapter{
46		container: ctnr,
47		backend:   b,
48		secrets:   secrets,
49	}, nil
50}
51
52func (c *containerAdapter) pullImage(ctx context.Context) error {
53	spec := c.container.spec()
54
55	// Skip pulling if the image is referenced by image ID.
56	if _, err := digest.ParseDigest(spec.Image); err == nil {
57		return nil
58	}
59
60	// Skip pulling if the image is referenced by digest and already
61	// exists locally.
62	named, err := reference.ParseNamed(spec.Image)
63	if err == nil {
64		if _, ok := named.(reference.Canonical); ok {
65			_, err := c.backend.LookupImage(spec.Image)
66			if err == nil {
67				return nil
68			}
69		}
70	}
71
72	// if the image needs to be pulled, the auth config will be retrieved and updated
73	var encodedAuthConfig string
74	if spec.PullOptions != nil {
75		encodedAuthConfig = spec.PullOptions.RegistryAuth
76	}
77
78	authConfig := &types.AuthConfig{}
79	if encodedAuthConfig != "" {
80		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
81			logrus.Warnf("invalid authconfig: %v", err)
82		}
83	}
84
85	pr, pw := io.Pipe()
86	metaHeaders := map[string][]string{}
87	go func() {
88		err := c.backend.PullImage(ctx, c.container.image(), "", metaHeaders, authConfig, pw)
89		pw.CloseWithError(err)
90	}()
91
92	dec := json.NewDecoder(pr)
93	dec.UseNumber()
94	m := map[string]interface{}{}
95	spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)
96
97	lastStatus := ""
98	for {
99		if err := dec.Decode(&m); err != nil {
100			if err == io.EOF {
101				break
102			}
103			return err
104		}
105		l := log.G(ctx)
106		// limit pull progress logs unless the status changes
107		if spamLimiter.Allow() || lastStatus != m["status"] {
108			// if we have progress details, we have everything we need
109			if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
110				// first, log the image and status
111				l = l.WithFields(logrus.Fields{
112					"image":  c.container.image(),
113					"status": m["status"],
114				})
115				// then, if we have progress, log the progress
116				if progress["current"] != nil && progress["total"] != nil {
117					l = l.WithFields(logrus.Fields{
118						"current": progress["current"],
119						"total":   progress["total"],
120					})
121				}
122			}
123			l.Debug("pull in progress")
124		}
125		// sometimes, we get no useful information at all, and add no fields
126		if status, ok := m["status"].(string); ok {
127			lastStatus = status
128		}
129	}
130
131	// if the final stream object contained an error, return it
132	if errMsg, ok := m["error"]; ok {
133		return fmt.Errorf("%v", errMsg)
134	}
135	return nil
136}
137
138func (c *containerAdapter) createNetworks(ctx context.Context) error {
139	for _, network := range c.container.networks() {
140		ncr, err := c.container.networkCreateRequest(network)
141		if err != nil {
142			return err
143		}
144
145		if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
146			if _, ok := err.(libnetwork.NetworkNameError); ok {
147				continue
148			}
149
150			return err
151		}
152	}
153
154	return nil
155}
156
157func (c *containerAdapter) removeNetworks(ctx context.Context) error {
158	for _, nid := range c.container.networks() {
159		if err := c.backend.DeleteManagedNetwork(nid); err != nil {
160			switch err.(type) {
161			case *libnetwork.ActiveEndpointsError:
162				continue
163			case libnetwork.ErrNoSuchNetwork:
164				continue
165			default:
166				log.G(ctx).Errorf("network %s remove failed: %v", nid, err)
167				return err
168			}
169		}
170	}
171
172	return nil
173}
174
175func (c *containerAdapter) networkAttach(ctx context.Context) error {
176	config := c.container.createNetworkingConfig()
177
178	var (
179		networkName string
180		networkID   string
181	)
182
183	if config != nil {
184		for n, epConfig := range config.EndpointsConfig {
185			networkName = n
186			networkID = epConfig.NetworkID
187			break
188		}
189	}
190
191	return c.backend.UpdateAttachment(networkName, networkID, c.container.id(), config)
192}
193
194func (c *containerAdapter) waitForDetach(ctx context.Context) error {
195	config := c.container.createNetworkingConfig()
196
197	var (
198		networkName string
199		networkID   string
200	)
201
202	if config != nil {
203		for n, epConfig := range config.EndpointsConfig {
204			networkName = n
205			networkID = epConfig.NetworkID
206			break
207		}
208	}
209
210	return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.id())
211}
212
213func (c *containerAdapter) create(ctx context.Context) error {
214	var cr containertypes.ContainerCreateCreatedBody
215	var err error
216
217	if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{
218		Name:       c.container.name(),
219		Config:     c.container.config(),
220		HostConfig: c.container.hostConfig(),
221		// Use the first network in container create
222		NetworkingConfig: c.container.createNetworkingConfig(),
223	}); err != nil {
224		return err
225	}
226
227	// Docker daemon currently doesn't support multiple networks in container create
228	// Connect to all other networks
229	nc := c.container.connectNetworkingConfig()
230
231	if nc != nil {
232		for n, ep := range nc.EndpointsConfig {
233			if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
234				return err
235			}
236		}
237	}
238
239	container := c.container.task.Spec.GetContainer()
240	if container == nil {
241		return fmt.Errorf("unable to get container from task spec")
242	}
243
244	// configure secrets
245	if err := c.backend.SetContainerSecretStore(cr.ID, c.secrets); err != nil {
246		return err
247	}
248
249	refs := convert.SecretReferencesFromGRPC(container.Secrets)
250	if err := c.backend.SetContainerSecretReferences(cr.ID, refs); err != nil {
251		return err
252	}
253
254	if err := c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil {
255		return err
256	}
257
258	return nil
259}
260
261func (c *containerAdapter) start(ctx context.Context) error {
262	return c.backend.ContainerStart(c.container.name(), nil, "", "")
263}
264
265func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
266	cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
267	if ctx.Err() != nil {
268		return types.ContainerJSON{}, ctx.Err()
269	}
270	if err != nil {
271		return types.ContainerJSON{}, err
272	}
273	return *cs, nil
274}
275
276// events issues a call to the events API and returns a channel with all
277// events. The stream of events can be shutdown by cancelling the context.
278func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
279	log.G(ctx).Debugf("waiting on events")
280	buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
281	eventsq := make(chan events.Message, len(buffer))
282
283	for _, event := range buffer {
284		eventsq <- event
285	}
286
287	go func() {
288		defer c.backend.UnsubscribeFromEvents(l)
289
290		for {
291			select {
292			case ev := <-l:
293				jev, ok := ev.(events.Message)
294				if !ok {
295					log.G(ctx).Warnf("unexpected event message: %q", ev)
296					continue
297				}
298				select {
299				case eventsq <- jev:
300				case <-ctx.Done():
301					return
302				}
303			case <-ctx.Done():
304				return
305			}
306		}
307	}()
308
309	return eventsq
310}
311
312func (c *containerAdapter) wait(ctx context.Context) error {
313	return c.backend.ContainerWaitWithContext(ctx, c.container.nameOrID())
314}
315
316func (c *containerAdapter) shutdown(ctx context.Context) error {
317	// Default stop grace period to nil (daemon will use the stopTimeout of the container)
318	var stopgrace *int
319	spec := c.container.spec()
320	if spec.StopGracePeriod != nil {
321		stopgraceValue := int(spec.StopGracePeriod.Seconds)
322		stopgrace = &stopgraceValue
323	}
324	return c.backend.ContainerStop(c.container.name(), stopgrace)
325}
326
327func (c *containerAdapter) terminate(ctx context.Context) error {
328	return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
329}
330
331func (c *containerAdapter) remove(ctx context.Context) error {
332	return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
333		RemoveVolume: true,
334		ForceRemove:  true,
335	})
336}
337
338func (c *containerAdapter) createVolumes(ctx context.Context) error {
339	// Create plugin volumes that are embedded inside a Mount
340	for _, mount := range c.container.task.Spec.GetContainer().Mounts {
341		if mount.Type != api.MountTypeVolume {
342			continue
343		}
344
345		if mount.VolumeOptions == nil {
346			continue
347		}
348
349		if mount.VolumeOptions.DriverConfig == nil {
350			continue
351		}
352
353		req := c.container.volumeCreateRequest(&mount)
354
355		// Check if this volume exists on the engine
356		if _, err := c.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
357			// TODO(amitshukla): Today, volume create through the engine api does not return an error
358			// when the named volume with the same parameters already exists.
359			// It returns an error if the driver name is different - that is a valid error
360			return err
361		}
362
363	}
364
365	return nil
366}
367
368func (c *containerAdapter) activateServiceBinding() error {
369	return c.backend.ActivateContainerServiceBinding(c.container.name())
370}
371
372func (c *containerAdapter) deactivateServiceBinding() error {
373	return c.backend.DeactivateContainerServiceBinding(c.container.name())
374}
375
376func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) {
377	reader, writer := io.Pipe()
378
379	apiOptions := &backend.ContainerLogsConfig{
380		ContainerLogsOptions: types.ContainerLogsOptions{
381			Follow: options.Follow,
382
383			// TODO(stevvooe): Parse timestamp out of message. This
384			// absolutely needs to be done before going to production with
385			// this, at it is completely redundant.
386			Timestamps: true,
387			Details:    false, // no clue what to do with this, let's just deprecate it.
388		},
389		OutStream: writer,
390	}
391
392	if options.Since != nil {
393		since, err := ptypes.Timestamp(options.Since)
394		if err != nil {
395			return nil, err
396		}
397		apiOptions.Since = since.Format(time.RFC3339Nano)
398	}
399
400	if options.Tail < 0 {
401		// See protobuf documentation for details of how this works.
402		apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
403	} else if options.Tail > 0 {
404		return nil, fmt.Errorf("tail relative to start of logs not supported via docker API")
405	}
406
407	if len(options.Streams) == 0 {
408		// empty == all
409		apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
410	} else {
411		for _, stream := range options.Streams {
412			switch stream {
413			case api.LogStreamStdout:
414				apiOptions.ShowStdout = true
415			case api.LogStreamStderr:
416				apiOptions.ShowStderr = true
417			}
418		}
419	}
420
421	chStarted := make(chan struct{})
422	go func() {
423		defer writer.Close()
424		c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted)
425	}()
426
427	return reader, nil
428}
429
430// todo: typed/wrapped errors
431func isContainerCreateNameConflict(err error) bool {
432	return strings.Contains(err.Error(), "Conflict. The name")
433}
434
435func isUnknownContainer(err error) bool {
436	return strings.Contains(err.Error(), "No such container:")
437}
438
439func isStoppedContainer(err error) bool {
440	return strings.Contains(err.Error(), "is already stopped")
441}
442