1package container
2
3import (
4	"fmt"
5	"os"
6	"strconv"
7	"strings"
8	"time"
9
10	"github.com/docker/docker/api/types"
11	"github.com/docker/docker/api/types/events"
12	executorpkg "github.com/docker/docker/daemon/cluster/executor"
13	"github.com/docker/go-connections/nat"
14	"github.com/docker/libnetwork"
15	"github.com/docker/swarmkit/agent/exec"
16	"github.com/docker/swarmkit/api"
17	"github.com/docker/swarmkit/log"
18	gogotypes "github.com/gogo/protobuf/types"
19	"github.com/pkg/errors"
20	"golang.org/x/net/context"
21	"golang.org/x/time/rate"
22)
23
24const defaultGossipConvergeDelay = 2 * time.Second
25
26// controller implements agent.Controller against docker's API.
27//
28// Most operations against docker's API are done through the container name,
29// which is unique to the task.
30type controller struct {
31	task       *api.Task
32	adapter    *containerAdapter
33	closed     chan struct{}
34	err        error
35	pulled     chan struct{} // closed after pull
36	cancelPull func()        // cancels pull context if not nil
37	pullErr    error         // pull error, only read after pulled closed
38}
39
40var _ exec.Controller = &controller{}
41
42// NewController returns a docker exec runner for the provided task.
43func newController(b executorpkg.Backend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*controller, error) {
44	adapter, err := newContainerAdapter(b, task, node, dependencies)
45	if err != nil {
46		return nil, err
47	}
48
49	return &controller{
50		task:    task,
51		adapter: adapter,
52		closed:  make(chan struct{}),
53	}, nil
54}
55
56func (r *controller) Task() (*api.Task, error) {
57	return r.task, nil
58}
59
60// ContainerStatus returns the container-specific status for the task.
61func (r *controller) ContainerStatus(ctx context.Context) (*api.ContainerStatus, error) {
62	ctnr, err := r.adapter.inspect(ctx)
63	if err != nil {
64		if isUnknownContainer(err) {
65			return nil, nil
66		}
67		return nil, err
68	}
69	return parseContainerStatus(ctnr)
70}
71
72func (r *controller) PortStatus(ctx context.Context) (*api.PortStatus, error) {
73	ctnr, err := r.adapter.inspect(ctx)
74	if err != nil {
75		if isUnknownContainer(err) {
76			return nil, nil
77		}
78
79		return nil, err
80	}
81
82	return parsePortStatus(ctnr)
83}
84
85// Update tasks a recent task update and applies it to the container.
86func (r *controller) Update(ctx context.Context, t *api.Task) error {
87	// TODO(stevvooe): While assignment of tasks is idempotent, we do allow
88	// updates of metadata, such as labelling, as well as any other properties
89	// that make sense.
90	return nil
91}
92
93// Prepare creates a container and ensures the image is pulled.
94//
95// If the container has already be created, exec.ErrTaskPrepared is returned.
96func (r *controller) Prepare(ctx context.Context) error {
97	if err := r.checkClosed(); err != nil {
98		return err
99	}
100
101	// Make sure all the networks that the task needs are created.
102	if err := r.adapter.createNetworks(ctx); err != nil {
103		return err
104	}
105
106	// Make sure all the volumes that the task needs are created.
107	if err := r.adapter.createVolumes(ctx); err != nil {
108		return err
109	}
110
111	if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
112		if r.pulled == nil {
113			// Fork the pull to a different context to allow pull to continue
114			// on re-entrant calls to Prepare. This ensures that Prepare can be
115			// idempotent and not incur the extra cost of pulling when
116			// cancelled on updates.
117			var pctx context.Context
118
119			r.pulled = make(chan struct{})
120			pctx, r.cancelPull = context.WithCancel(context.Background()) // TODO(stevvooe): Bind a context to the entire controller.
121
122			go func() {
123				defer close(r.pulled)
124				r.pullErr = r.adapter.pullImage(pctx) // protected by closing r.pulled
125			}()
126		}
127
128		select {
129		case <-ctx.Done():
130			return ctx.Err()
131		case <-r.pulled:
132			if r.pullErr != nil {
133				// NOTE(stevvooe): We always try to pull the image to make sure we have
134				// the most up to date version. This will return an error, but we only
135				// log it. If the image truly doesn't exist, the create below will
136				// error out.
137				//
138				// This gives us some nice behavior where we use up to date versions of
139				// mutable tags, but will still run if the old image is available but a
140				// registry is down.
141				//
142				// If you don't want this behavior, lock down your image to an
143				// immutable tag or digest.
144				log.G(ctx).WithError(r.pullErr).Error("pulling image failed")
145			}
146		}
147	}
148	if err := r.adapter.create(ctx); err != nil {
149		if isContainerCreateNameConflict(err) {
150			if _, err := r.adapter.inspect(ctx); err != nil {
151				return err
152			}
153
154			// container is already created. success!
155			return exec.ErrTaskPrepared
156		}
157
158		return err
159	}
160
161	return nil
162}
163
164// Start the container. An error will be returned if the container is already started.
165func (r *controller) Start(ctx context.Context) error {
166	if err := r.checkClosed(); err != nil {
167		return err
168	}
169
170	ctnr, err := r.adapter.inspect(ctx)
171	if err != nil {
172		return err
173	}
174
175	// Detect whether the container has *ever* been started. If so, we don't
176	// issue the start.
177	//
178	// TODO(stevvooe): This is very racy. While reading inspect, another could
179	// start the process and we could end up starting it twice.
180	if ctnr.State.Status != "created" {
181		return exec.ErrTaskStarted
182	}
183
184	for {
185		if err := r.adapter.start(ctx); err != nil {
186			if _, ok := errors.Cause(err).(libnetwork.ErrNoSuchNetwork); ok {
187				// Retry network creation again if we
188				// failed because some of the networks
189				// were not found.
190				if err := r.adapter.createNetworks(ctx); err != nil {
191					return err
192				}
193
194				continue
195			}
196
197			return errors.Wrap(err, "starting container failed")
198		}
199
200		break
201	}
202
203	// no health check
204	if ctnr.Config == nil || ctnr.Config.Healthcheck == nil || len(ctnr.Config.Healthcheck.Test) == 0 || ctnr.Config.Healthcheck.Test[0] == "NONE" {
205		if err := r.adapter.activateServiceBinding(); err != nil {
206			log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s which has no healthcheck config", r.adapter.container.name())
207			return err
208		}
209		return nil
210	}
211
212	// wait for container to be healthy
213	eventq := r.adapter.events(ctx)
214
215	var healthErr error
216	for {
217		select {
218		case event := <-eventq:
219			if !r.matchevent(event) {
220				continue
221			}
222
223			switch event.Action {
224			case "die": // exit on terminal events
225				ctnr, err := r.adapter.inspect(ctx)
226				if err != nil {
227					return errors.Wrap(err, "die event received")
228				} else if ctnr.State.ExitCode != 0 {
229					return &exitError{code: ctnr.State.ExitCode, cause: healthErr}
230				}
231
232				return nil
233			case "destroy":
234				// If we get here, something has gone wrong but we want to exit
235				// and report anyways.
236				return ErrContainerDestroyed
237			case "health_status: unhealthy":
238				// in this case, we stop the container and report unhealthy status
239				if err := r.Shutdown(ctx); err != nil {
240					return errors.Wrap(err, "unhealthy container shutdown failed")
241				}
242				// set health check error, and wait for container to fully exit ("die" event)
243				healthErr = ErrContainerUnhealthy
244			case "health_status: healthy":
245				if err := r.adapter.activateServiceBinding(); err != nil {
246					log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s after healthy event", r.adapter.container.name())
247					return err
248				}
249				return nil
250			}
251		case <-ctx.Done():
252			return ctx.Err()
253		case <-r.closed:
254			return r.err
255		}
256	}
257}
258
259// Wait on the container to exit.
260func (r *controller) Wait(pctx context.Context) error {
261	if err := r.checkClosed(); err != nil {
262		return err
263	}
264
265	ctx, cancel := context.WithCancel(pctx)
266	defer cancel()
267
268	healthErr := make(chan error, 1)
269	go func() {
270		ectx, cancel := context.WithCancel(ctx) // cancel event context on first event
271		defer cancel()
272		if err := r.checkHealth(ectx); err == ErrContainerUnhealthy {
273			healthErr <- ErrContainerUnhealthy
274			if err := r.Shutdown(ectx); err != nil {
275				log.G(ectx).WithError(err).Debug("shutdown failed on unhealthy")
276			}
277		}
278	}()
279
280	waitC, err := r.adapter.wait(ctx)
281	if err != nil {
282		return err
283	}
284
285	if status := <-waitC; status.ExitCode() != 0 {
286		exitErr := &exitError{
287			code: status.ExitCode(),
288		}
289
290		// Set the cause if it is knowable.
291		select {
292		case e := <-healthErr:
293			exitErr.cause = e
294		default:
295			if status.Err() != nil {
296				exitErr.cause = status.Err()
297			}
298		}
299
300		return exitErr
301	}
302
303	return nil
304}
305
306func (r *controller) hasServiceBinding() bool {
307	if r.task == nil {
308		return false
309	}
310
311	// service is attached to a network besides the default bridge
312	for _, na := range r.task.Networks {
313		if na.Network == nil ||
314			na.Network.DriverState == nil ||
315			na.Network.DriverState.Name == "bridge" && na.Network.Spec.Annotations.Name == "bridge" {
316			continue
317		}
318		return true
319	}
320
321	return false
322}
323
324// Shutdown the container cleanly.
325func (r *controller) Shutdown(ctx context.Context) error {
326	if err := r.checkClosed(); err != nil {
327		return err
328	}
329
330	if r.cancelPull != nil {
331		r.cancelPull()
332	}
333
334	if r.hasServiceBinding() {
335		// remove container from service binding
336		if err := r.adapter.deactivateServiceBinding(); err != nil {
337			log.G(ctx).WithError(err).Warningf("failed to deactivate service binding for container %s", r.adapter.container.name())
338			// Don't return an error here, because failure to deactivate
339			// the service binding is expected if the container was never
340			// started.
341		}
342
343		// add a delay for gossip converge
344		// TODO(dongluochen): this delay should be configurable to fit different cluster size and network delay.
345		time.Sleep(defaultGossipConvergeDelay)
346	}
347
348	if err := r.adapter.shutdown(ctx); err != nil {
349		if isUnknownContainer(err) || isStoppedContainer(err) {
350			return nil
351		}
352
353		return err
354	}
355
356	return nil
357}
358
359// Terminate the container, with force.
360func (r *controller) Terminate(ctx context.Context) error {
361	if err := r.checkClosed(); err != nil {
362		return err
363	}
364
365	if r.cancelPull != nil {
366		r.cancelPull()
367	}
368
369	if err := r.adapter.terminate(ctx); err != nil {
370		if isUnknownContainer(err) {
371			return nil
372		}
373
374		return err
375	}
376
377	return nil
378}
379
380// Remove the container and its resources.
381func (r *controller) Remove(ctx context.Context) error {
382	if err := r.checkClosed(); err != nil {
383		return err
384	}
385
386	if r.cancelPull != nil {
387		r.cancelPull()
388	}
389
390	// It may be necessary to shut down the task before removing it.
391	if err := r.Shutdown(ctx); err != nil {
392		if isUnknownContainer(err) {
393			return nil
394		}
395		// This may fail if the task was already shut down.
396		log.G(ctx).WithError(err).Debug("shutdown failed on removal")
397	}
398
399	// Try removing networks referenced in this task in case this
400	// task is the last one referencing it
401	if err := r.adapter.removeNetworks(ctx); err != nil {
402		if isUnknownContainer(err) {
403			return nil
404		}
405		return err
406	}
407
408	if err := r.adapter.remove(ctx); err != nil {
409		if isUnknownContainer(err) {
410			return nil
411		}
412
413		return err
414	}
415	return nil
416}
417
418// waitReady waits for a container to be "ready".
419// Ready means it's past the started state.
420func (r *controller) waitReady(pctx context.Context) error {
421	if err := r.checkClosed(); err != nil {
422		return err
423	}
424
425	ctx, cancel := context.WithCancel(pctx)
426	defer cancel()
427
428	eventq := r.adapter.events(ctx)
429
430	ctnr, err := r.adapter.inspect(ctx)
431	if err != nil {
432		if !isUnknownContainer(err) {
433			return errors.Wrap(err, "inspect container failed")
434		}
435	} else {
436		switch ctnr.State.Status {
437		case "running", "exited", "dead":
438			return nil
439		}
440	}
441
442	for {
443		select {
444		case event := <-eventq:
445			if !r.matchevent(event) {
446				continue
447			}
448
449			switch event.Action {
450			case "start":
451				return nil
452			}
453		case <-ctx.Done():
454			return ctx.Err()
455		case <-r.closed:
456			return r.err
457		}
458	}
459}
460
461func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error {
462	if err := r.checkClosed(); err != nil {
463		return err
464	}
465
466	// if we're following, wait for this container to be ready. there is a
467	// problem here: if the container will never be ready (for example, it has
468	// been totally deleted) then this will wait forever. however, this doesn't
469	// actually cause any UI issues, and shouldn't be a problem. the stuck wait
470	// will go away when the follow (context) is canceled.
471	if options.Follow {
472		if err := r.waitReady(ctx); err != nil {
473			return errors.Wrap(err, "container not ready for logs")
474		}
475	}
476	// if we're not following, we're not gonna wait for the container to be
477	// ready. just call logs. if the container isn't ready, the call will fail
478	// and return an error. no big deal, we don't care, we only want the logs
479	// we can get RIGHT NOW with no follow
480
481	logsContext, cancel := context.WithCancel(ctx)
482	msgs, err := r.adapter.logs(logsContext, options)
483	defer cancel()
484	if err != nil {
485		return errors.Wrap(err, "failed getting container logs")
486	}
487
488	var (
489		// use a rate limiter to keep things under control but also provides some
490		// ability coalesce messages.
491		limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s
492		msgctx  = api.LogContext{
493			NodeID:    r.task.NodeID,
494			ServiceID: r.task.ServiceID,
495			TaskID:    r.task.ID,
496		}
497	)
498
499	for {
500		msg, ok := <-msgs
501		if !ok {
502			// we're done here, no more messages
503			return nil
504		}
505
506		if msg.Err != nil {
507			// the defered cancel closes the adapter's log stream
508			return msg.Err
509		}
510
511		// wait here for the limiter to catch up
512		if err := limiter.WaitN(ctx, len(msg.Line)); err != nil {
513			return errors.Wrap(err, "failed rate limiter")
514		}
515		tsp, err := gogotypes.TimestampProto(msg.Timestamp)
516		if err != nil {
517			return errors.Wrap(err, "failed to convert timestamp")
518		}
519		var stream api.LogStream
520		if msg.Source == "stdout" {
521			stream = api.LogStreamStdout
522		} else if msg.Source == "stderr" {
523			stream = api.LogStreamStderr
524		}
525
526		// parse the details out of the Attrs map
527		var attrs []api.LogAttr
528		if len(msg.Attrs) != 0 {
529			attrs = make([]api.LogAttr, 0, len(msg.Attrs))
530			for _, attr := range msg.Attrs {
531				attrs = append(attrs, api.LogAttr{Key: attr.Key, Value: attr.Value})
532			}
533		}
534
535		if err := publisher.Publish(ctx, api.LogMessage{
536			Context:   msgctx,
537			Timestamp: tsp,
538			Stream:    stream,
539			Attrs:     attrs,
540			Data:      msg.Line,
541		}); err != nil {
542			return errors.Wrap(err, "failed to publish log message")
543		}
544	}
545}
546
547// Close the runner and clean up any ephemeral resources.
548func (r *controller) Close() error {
549	select {
550	case <-r.closed:
551		return r.err
552	default:
553		if r.cancelPull != nil {
554			r.cancelPull()
555		}
556
557		r.err = exec.ErrControllerClosed
558		close(r.closed)
559	}
560	return nil
561}
562
563func (r *controller) matchevent(event events.Message) bool {
564	if event.Type != events.ContainerEventType {
565		return false
566	}
567	// we can't filter using id since it will have huge chances to introduce a deadlock. see #33377.
568	return event.Actor.Attributes["name"] == r.adapter.container.name()
569}
570
571func (r *controller) checkClosed() error {
572	select {
573	case <-r.closed:
574		return r.err
575	default:
576		return nil
577	}
578}
579
580func parseContainerStatus(ctnr types.ContainerJSON) (*api.ContainerStatus, error) {
581	status := &api.ContainerStatus{
582		ContainerID: ctnr.ID,
583		PID:         int32(ctnr.State.Pid),
584		ExitCode:    int32(ctnr.State.ExitCode),
585	}
586
587	return status, nil
588}
589
590func parsePortStatus(ctnr types.ContainerJSON) (*api.PortStatus, error) {
591	status := &api.PortStatus{}
592
593	if ctnr.NetworkSettings != nil && len(ctnr.NetworkSettings.Ports) > 0 {
594		exposedPorts, err := parsePortMap(ctnr.NetworkSettings.Ports)
595		if err != nil {
596			return nil, err
597		}
598		status.Ports = exposedPorts
599	}
600
601	return status, nil
602}
603
604func parsePortMap(portMap nat.PortMap) ([]*api.PortConfig, error) {
605	exposedPorts := make([]*api.PortConfig, 0, len(portMap))
606
607	for portProtocol, mapping := range portMap {
608		parts := strings.SplitN(string(portProtocol), "/", 2)
609		if len(parts) != 2 {
610			return nil, fmt.Errorf("invalid port mapping: %s", portProtocol)
611		}
612
613		port, err := strconv.ParseUint(parts[0], 10, 16)
614		if err != nil {
615			return nil, err
616		}
617
618		protocol := api.ProtocolTCP
619		switch strings.ToLower(parts[1]) {
620		case "tcp":
621			protocol = api.ProtocolTCP
622		case "udp":
623			protocol = api.ProtocolUDP
624		default:
625			return nil, fmt.Errorf("invalid protocol: %s", parts[1])
626		}
627
628		for _, binding := range mapping {
629			hostPort, err := strconv.ParseUint(binding.HostPort, 10, 16)
630			if err != nil {
631				return nil, err
632			}
633
634			// TODO(aluzzardi): We're losing the port `name` here since
635			// there's no way to retrieve it back from the Engine.
636			exposedPorts = append(exposedPorts, &api.PortConfig{
637				PublishMode:   api.PublishModeHost,
638				Protocol:      protocol,
639				TargetPort:    uint32(port),
640				PublishedPort: uint32(hostPort),
641			})
642		}
643	}
644
645	return exposedPorts, nil
646}
647
648type exitError struct {
649	code  int
650	cause error
651}
652
653func (e *exitError) Error() string {
654	if e.cause != nil {
655		return fmt.Sprintf("task: non-zero exit (%v): %v", e.code, e.cause)
656	}
657
658	return fmt.Sprintf("task: non-zero exit (%v)", e.code)
659}
660
661func (e *exitError) ExitCode() int {
662	return e.code
663}
664
665func (e *exitError) Cause() error {
666	return e.cause
667}
668
669// checkHealth blocks until unhealthy container is detected or ctx exits
670func (r *controller) checkHealth(ctx context.Context) error {
671	eventq := r.adapter.events(ctx)
672
673	for {
674		select {
675		case <-ctx.Done():
676			return nil
677		case <-r.closed:
678			return nil
679		case event := <-eventq:
680			if !r.matchevent(event) {
681				continue
682			}
683
684			switch event.Action {
685			case "health_status: unhealthy":
686				return ErrContainerUnhealthy
687			}
688		}
689	}
690}
691