1package docker
2
3import (
4	"context"
5	"fmt"
6	"net"
7	"os"
8	"path/filepath"
9	"runtime"
10	"strconv"
11	"strings"
12	"sync"
13	"time"
14
15	docker "github.com/fsouza/go-dockerclient"
16	"github.com/hashicorp/consul-template/signals"
17	hclog "github.com/hashicorp/go-hclog"
18	multierror "github.com/hashicorp/go-multierror"
19	plugin "github.com/hashicorp/go-plugin"
20	"github.com/hashicorp/nomad/client/taskenv"
21	"github.com/hashicorp/nomad/drivers/docker/docklog"
22	"github.com/hashicorp/nomad/drivers/shared/eventer"
23	nstructs "github.com/hashicorp/nomad/nomad/structs"
24	"github.com/hashicorp/nomad/plugins/base"
25	"github.com/hashicorp/nomad/plugins/drivers"
26	"github.com/hashicorp/nomad/plugins/shared/structs"
27	pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
28)
29
30var (
31	// createClientsLock is a lock that protects reading/writing global client
32	// variables
33	createClientsLock sync.Mutex
34
35	// client is a docker client with a timeout of 5 minutes. This is for doing
36	// all operations with the docker daemon besides which are not long running
37	// such as creating, killing containers, etc.
38	client *docker.Client
39
40	// waitClient is a docker client with no timeouts. This is used for long
41	// running operations such as waiting on containers and collect stats
42	waitClient *docker.Client
43
44	dockerTransientErrs = []string{
45		"Client.Timeout exceeded while awaiting headers",
46		"EOF",
47		"API error (500)",
48	}
49
50	// recoverableErrTimeouts returns a recoverable error if the error was due
51	// to timeouts
52	recoverableErrTimeouts = func(err error) error {
53		r := false
54		if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") ||
55			strings.Contains(err.Error(), "EOF") {
56			r = true
57		}
58		return nstructs.NewRecoverableError(err, r)
59	}
60
61	// taskHandleVersion is the version of task handle which this driver sets
62	// and understands how to decode driver state
63	taskHandleVersion = 1
64
65	// Nvidia-container-runtime environment variable names
66	nvidiaVisibleDevices = "NVIDIA_VISIBLE_DEVICES"
67)
68
69const (
70	dockerLabelAllocID = "com.hashicorp.nomad.alloc_id"
71)
72
73type Driver struct {
74	// eventer is used to handle multiplexing of TaskEvents calls such that an
75	// event can be broadcast to all callers
76	eventer *eventer.Eventer
77
78	// config contains the runtime configuration for the driver set by the
79	// SetConfig RPC
80	config *DriverConfig
81
82	// clientConfig contains a driver specific subset of the Nomad client
83	// configuration
84	clientConfig *base.ClientDriverConfig
85
86	// ctx is the context for the driver. It is passed to other subsystems to
87	// coordinate shutdown
88	ctx context.Context
89
90	// signalShutdown is called when the driver is shutting down and cancels the
91	// ctx passed to any subsystems
92	signalShutdown context.CancelFunc
93
94	// tasks is the in memory datastore mapping taskIDs to taskHandles
95	tasks *taskStore
96
97	// coordinator is what tracks multiple image pulls against the same docker image
98	coordinator *dockerCoordinator
99
100	// logger will log to the Nomad agent
101	logger hclog.Logger
102
103	// gpuRuntime indicates nvidia-docker runtime availability
104	gpuRuntime bool
105
106	// A tri-state boolean to know if the fingerprinting has happened and
107	// whether it has been successful
108	fingerprintSuccess *bool
109	fingerprintLock    sync.RWMutex
110
111	// A boolean to know if the docker driver has ever been correctly detected
112	// for use during fingerprinting.
113	detected     bool
114	detectedLock sync.RWMutex
115
116	reconciler *containerReconciler
117}
118
119// NewDockerDriver returns a docker implementation of a driver plugin
120func NewDockerDriver(logger hclog.Logger) drivers.DriverPlugin {
121	ctx, cancel := context.WithCancel(context.Background())
122	logger = logger.Named(pluginName)
123	return &Driver{
124		eventer:        eventer.NewEventer(ctx, logger),
125		config:         &DriverConfig{},
126		tasks:          newTaskStore(),
127		ctx:            ctx,
128		signalShutdown: cancel,
129		logger:         logger,
130	}
131}
132
133func (d *Driver) reattachToDockerLogger(reattachConfig *structs.ReattachConfig) (docklog.DockerLogger, *plugin.Client, error) {
134	reattach, err := pstructs.ReattachConfigToGoPlugin(reattachConfig)
135	if err != nil {
136		return nil, nil, err
137	}
138
139	dlogger, dloggerPluginClient, err := docklog.ReattachDockerLogger(reattach)
140	if err != nil {
141		return nil, nil, fmt.Errorf("failed to reattach to docker logger process: %v", err)
142	}
143
144	return dlogger, dloggerPluginClient, nil
145}
146
147func (d *Driver) setupNewDockerLogger(container *docker.Container, cfg *drivers.TaskConfig, startTime time.Time) (docklog.DockerLogger, *plugin.Client, error) {
148	dlogger, pluginClient, err := docklog.LaunchDockerLogger(d.logger)
149	if err != nil {
150		if pluginClient != nil {
151			pluginClient.Kill()
152		}
153		return nil, nil, fmt.Errorf("failed to launch docker logger plugin: %v", err)
154	}
155
156	if err := dlogger.Start(&docklog.StartOpts{
157		Endpoint:    d.config.Endpoint,
158		ContainerID: container.ID,
159		TTY:         container.Config.Tty,
160		Stdout:      cfg.StdoutPath,
161		Stderr:      cfg.StderrPath,
162		TLSCert:     d.config.TLS.Cert,
163		TLSKey:      d.config.TLS.Key,
164		TLSCA:       d.config.TLS.CA,
165		StartTime:   startTime.Unix(),
166	}); err != nil {
167		pluginClient.Kill()
168		return nil, nil, fmt.Errorf("failed to launch docker logger process %s: %v", container.ID, err)
169	}
170
171	return dlogger, pluginClient, nil
172}
173
174func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
175	if _, ok := d.tasks.Get(handle.Config.ID); ok {
176		return nil
177	}
178
179	// COMPAT(0.10): pre 0.9 upgrade path check
180	if handle.Version == 0 {
181		return d.recoverPre09Task(handle)
182	}
183
184	var handleState taskHandleState
185	if err := handle.GetDriverState(&handleState); err != nil {
186		return fmt.Errorf("failed to decode driver task state: %v", err)
187	}
188
189	client, _, err := d.dockerClients()
190	if err != nil {
191		return fmt.Errorf("failed to get docker client: %v", err)
192	}
193
194	container, err := client.InspectContainer(handleState.ContainerID)
195	if err != nil {
196		return fmt.Errorf("failed to inspect container for id %q: %v", handleState.ContainerID, err)
197	}
198
199	h := &taskHandle{
200		client:                client,
201		waitClient:            waitClient,
202		logger:                d.logger.With("container_id", container.ID),
203		task:                  handle.Config,
204		containerID:           container.ID,
205		containerImage:        container.Image,
206		doneCh:                make(chan bool),
207		waitCh:                make(chan struct{}),
208		removeContainerOnExit: d.config.GC.Container,
209		net:                   handleState.DriverNetwork,
210	}
211
212	if !d.config.DisableLogCollection {
213		h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig)
214		if err != nil {
215			d.logger.Warn("failed to reattach to docker logger process", "error", err)
216
217			h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now())
218			if err != nil {
219				if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
220					d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
221				}
222				return fmt.Errorf("failed to setup replacement docker logger: %v", err)
223			}
224
225			if err := handle.SetDriverState(h.buildState()); err != nil {
226				if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
227					d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
228				}
229				return fmt.Errorf("failed to store driver state: %v", err)
230			}
231		}
232	}
233
234	d.tasks.Set(handle.Config.ID, h)
235	go h.run()
236
237	return nil
238}
239
240func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
241	if _, ok := d.tasks.Get(cfg.ID); ok {
242		return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
243	}
244
245	var driverConfig TaskConfig
246
247	if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
248		return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
249	}
250
251	if driverConfig.Image == "" {
252		return nil, nil, fmt.Errorf("image name required for docker driver")
253	}
254
255	// Remove any http
256	if strings.HasPrefix(driverConfig.Image, "https://") {
257		driverConfig.Image = strings.Replace(driverConfig.Image, "https://", "", 1)
258	}
259
260	handle := drivers.NewTaskHandle(taskHandleVersion)
261	handle.Config = cfg
262
263	// Initialize docker API clients
264	client, _, err := d.dockerClients()
265	if err != nil {
266		return nil, nil, fmt.Errorf("Failed to connect to docker daemon: %s", err)
267	}
268
269	id, err := d.createImage(cfg, &driverConfig, client)
270	if err != nil {
271		return nil, nil, err
272	}
273
274	containerCfg, err := d.createContainerConfig(cfg, &driverConfig, driverConfig.Image)
275	if err != nil {
276		d.logger.Error("failed to create container configuration", "image_name", driverConfig.Image,
277			"image_id", id, "error", err)
278		return nil, nil, fmt.Errorf("Failed to create container configuration for image %q (%q): %v", driverConfig.Image, id, err)
279	}
280
281	startAttempts := 0
282CREATE:
283	container, err := d.createContainer(client, containerCfg, driverConfig.Image)
284	if err != nil {
285		d.logger.Error("failed to create container", "error", err)
286		client.RemoveContainer(docker.RemoveContainerOptions{
287			ID:    containerCfg.Name,
288			Force: true,
289		})
290		return nil, nil, nstructs.WrapRecoverable(fmt.Sprintf("failed to create container: %v", err), err)
291	}
292
293	d.logger.Info("created container", "container_id", container.ID)
294
295	// We don't need to start the container if the container is already running
296	// since we don't create containers which are already present on the host
297	// and are running
298	if !container.State.Running {
299		// Start the container
300		if err := d.startContainer(container); err != nil {
301			d.logger.Error("failed to start container", "container_id", container.ID, "error", err)
302			client.RemoveContainer(docker.RemoveContainerOptions{
303				ID:    container.ID,
304				Force: true,
305			})
306			// Some sort of docker race bug, recreating the container usually works
307			if strings.Contains(err.Error(), "OCI runtime create failed: container with id exists:") && startAttempts < 5 {
308				startAttempts++
309				d.logger.Debug("reattempting container create/start sequence", "attempt", startAttempts, "container_id", id)
310				goto CREATE
311			}
312			return nil, nil, nstructs.WrapRecoverable(fmt.Sprintf("Failed to start container %s: %s", container.ID, err), err)
313		}
314
315		// InspectContainer to get all of the container metadata as
316		// much of the metadata (eg networking) isn't populated until
317		// the container is started
318		runningContainer, err := client.InspectContainer(container.ID)
319		if err != nil {
320			client.RemoveContainer(docker.RemoveContainerOptions{
321				ID:    container.ID,
322				Force: true,
323			})
324			msg := "failed to inspect started container"
325			d.logger.Error(msg, "error", err)
326			client.RemoveContainer(docker.RemoveContainerOptions{
327				ID:    container.ID,
328				Force: true,
329			})
330			return nil, nil, nstructs.NewRecoverableError(fmt.Errorf("%s %s: %s", msg, container.ID, err), true)
331		}
332		container = runningContainer
333		d.logger.Info("started container", "container_id", container.ID)
334	} else {
335		d.logger.Debug("re-attaching to container", "container_id",
336			container.ID, "container_state", container.State.String())
337	}
338
339	collectingLogs := !d.config.DisableLogCollection
340
341	var dlogger docklog.DockerLogger
342	var pluginClient *plugin.Client
343
344	if collectingLogs {
345		dlogger, pluginClient, err = d.setupNewDockerLogger(container, cfg, time.Unix(0, 0))
346		if err != nil {
347			d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID)
348			client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
349			return nil, nil, err
350		}
351	}
352
353	// Detect container address
354	ip, autoUse := d.detectIP(container, &driverConfig)
355
356	net := &drivers.DriverNetwork{
357		PortMap:       driverConfig.PortMap,
358		IP:            ip,
359		AutoAdvertise: autoUse,
360	}
361
362	// Return a driver handle
363	h := &taskHandle{
364		client:                client,
365		waitClient:            waitClient,
366		dlogger:               dlogger,
367		dloggerPluginClient:   pluginClient,
368		logger:                d.logger.With("container_id", container.ID),
369		task:                  cfg,
370		containerID:           container.ID,
371		containerImage:        container.Image,
372		doneCh:                make(chan bool),
373		waitCh:                make(chan struct{}),
374		removeContainerOnExit: d.config.GC.Container,
375		net:                   net,
376	}
377
378	if err := handle.SetDriverState(h.buildState()); err != nil {
379		d.logger.Error("error encoding container occurred after startup, terminating container", "container_id", container.ID, "error", err)
380		if collectingLogs {
381			dlogger.Stop()
382			pluginClient.Kill()
383		}
384		client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
385		return nil, nil, err
386	}
387
388	d.tasks.Set(cfg.ID, h)
389	go h.run()
390
391	return handle, net, nil
392}
393
394// createContainerClient is the subset of Docker Client methods used by the
395// createContainer method to ease testing subtle error conditions.
396type createContainerClient interface {
397	CreateContainer(docker.CreateContainerOptions) (*docker.Container, error)
398	InspectContainer(id string) (*docker.Container, error)
399	ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error)
400	RemoveContainer(opts docker.RemoveContainerOptions) error
401}
402
403// createContainer creates the container given the passed configuration. It
404// attempts to handle any transient Docker errors.
405func (d *Driver) createContainer(client createContainerClient, config docker.CreateContainerOptions,
406	image string) (*docker.Container, error) {
407	// Create a container
408	attempted := 0
409CREATE:
410	container, createErr := client.CreateContainer(config)
411	if createErr == nil {
412		return container, nil
413	}
414
415	d.logger.Debug("failed to create container", "container_name",
416		config.Name, "image_name", image, "image_id", config.Config.Image,
417		"attempt", attempted+1, "error", createErr)
418
419	// Volume management tools like Portworx may not have detached a volume
420	// from a previous node before Nomad started a task replacement task.
421	// Treat these errors as recoverable so we retry.
422	if strings.Contains(strings.ToLower(createErr.Error()), "volume is attached on another node") {
423		return nil, nstructs.NewRecoverableError(createErr, true)
424	}
425
426	// If the container already exists determine whether it's already
427	// running or if it's dead and needs to be recreated.
428	if strings.Contains(strings.ToLower(createErr.Error()), "container already exists") {
429
430		container, err := d.containerByName(config.Name)
431		if err != nil {
432			return nil, err
433		}
434
435		if container != nil && container.State.Running {
436			return container, nil
437		}
438
439		// Delete matching containers
440		err = client.RemoveContainer(docker.RemoveContainerOptions{
441			ID:    container.ID,
442			Force: true,
443		})
444		if err != nil {
445			d.logger.Error("failed to purge container", "container_id", container.ID)
446			return nil, recoverableErrTimeouts(fmt.Errorf("Failed to purge container %s: %s", container.ID, err))
447		} else {
448			d.logger.Info("purged container", "container_id", container.ID)
449		}
450
451		if attempted < 5 {
452			attempted++
453			time.Sleep(nextBackoff(attempted))
454			goto CREATE
455		}
456	} else if strings.Contains(strings.ToLower(createErr.Error()), "no such image") {
457		// There is still a very small chance this is possible even with the
458		// coordinator so retry.
459		return nil, nstructs.NewRecoverableError(createErr, true)
460	} else if isDockerTransientError(createErr) && attempted < 5 {
461		attempted++
462		time.Sleep(nextBackoff(attempted))
463		goto CREATE
464	}
465
466	return nil, recoverableErrTimeouts(createErr)
467}
468
469// startContainer starts the passed container. It attempts to handle any
470// transient Docker errors.
471func (d *Driver) startContainer(c *docker.Container) error {
472	// Start a container
473	attempted := 0
474START:
475	startErr := client.StartContainer(c.ID, c.HostConfig)
476	if startErr == nil || strings.Contains(startErr.Error(), "Container already running") {
477		return nil
478	}
479
480	d.logger.Debug("failed to start container", "container_id", c.ID, "attempt", attempted+1, "error", startErr)
481
482	if isDockerTransientError(startErr) {
483		if attempted < 5 {
484			attempted++
485			time.Sleep(nextBackoff(attempted))
486			goto START
487		}
488		return nstructs.NewRecoverableError(startErr, true)
489	}
490
491	return recoverableErrTimeouts(startErr)
492}
493
494// nextBackoff returns appropriate docker backoff durations after attempted attempts.
495func nextBackoff(attempted int) time.Duration {
496	// attempts in 200ms, 800ms, 3.2s, 12.8s, 51.2s
497	// TODO: add randomization factor and extract to a helper
498	return 1 << (2 * uint64(attempted)) * 50 * time.Millisecond
499}
500
501// createImage creates a docker image either by pulling it from a registry or by
502// loading it from the file system
503func (d *Driver) createImage(task *drivers.TaskConfig, driverConfig *TaskConfig, client *docker.Client) (string, error) {
504	image := driverConfig.Image
505	repo, tag := parseDockerImage(image)
506
507	callerID := fmt.Sprintf("%s-%s", task.ID, task.Name)
508
509	// We're going to check whether the image is already downloaded. If the tag
510	// is "latest", or ForcePull is set, we have to check for a new version every time so we don't
511	// bother to check and cache the id here. We'll download first, then cache.
512	if driverConfig.ForcePull {
513		d.logger.Debug("force pulling image instead of inspecting local", "image_ref", dockerImageRef(repo, tag))
514	} else if tag != "latest" {
515		if dockerImage, _ := client.InspectImage(image); dockerImage != nil {
516			// Image exists so just increment its reference count
517			d.coordinator.IncrementImageReference(dockerImage.ID, image, callerID)
518			return dockerImage.ID, nil
519		}
520	}
521
522	// Load the image if specified
523	if driverConfig.LoadImage != "" {
524		return d.loadImage(task, driverConfig, client)
525	}
526
527	// Download the image
528	return d.pullImage(task, driverConfig, client, repo, tag)
529}
530
531// pullImage creates an image by pulling it from a docker registry
532func (d *Driver) pullImage(task *drivers.TaskConfig, driverConfig *TaskConfig, client *docker.Client, repo, tag string) (id string, err error) {
533	authOptions, err := d.resolveRegistryAuthentication(driverConfig, repo)
534	if err != nil {
535		if driverConfig.AuthSoftFail {
536			d.logger.Warn("Failed to find docker repo auth", "repo", repo, "error", err)
537		} else {
538			return "", fmt.Errorf("Failed to find docker auth for repo %q: %v", repo, err)
539		}
540	}
541
542	if authIsEmpty(authOptions) {
543		d.logger.Debug("did not find docker auth for repo", "repo", repo)
544	}
545
546	d.eventer.EmitEvent(&drivers.TaskEvent{
547		TaskID:    task.ID,
548		AllocID:   task.AllocID,
549		TaskName:  task.Name,
550		Timestamp: time.Now(),
551		Message:   "Downloading image",
552		Annotations: map[string]string{
553			"image": dockerImageRef(repo, tag),
554		},
555	})
556
557	return d.coordinator.PullImage(driverConfig.Image, authOptions, task.ID, d.emitEventFunc(task), d.config.pullActivityTimeoutDuration)
558}
559
560func (d *Driver) emitEventFunc(task *drivers.TaskConfig) LogEventFn {
561	return func(msg string, annotations map[string]string) {
562		d.eventer.EmitEvent(&drivers.TaskEvent{
563			TaskID:      task.ID,
564			AllocID:     task.AllocID,
565			TaskName:    task.Name,
566			Timestamp:   time.Now(),
567			Message:     msg,
568			Annotations: annotations,
569		})
570	}
571}
572
573// authBackend encapsulates a function that resolves registry credentials.
574type authBackend func(string) (*docker.AuthConfiguration, error)
575
576// resolveRegistryAuthentication attempts to retrieve auth credentials for the
577// repo, trying all authentication-backends possible.
578func (d *Driver) resolveRegistryAuthentication(driverConfig *TaskConfig, repo string) (*docker.AuthConfiguration, error) {
579	return firstValidAuth(repo, []authBackend{
580		authFromTaskConfig(driverConfig),
581		authFromDockerConfig(d.config.Auth.Config),
582		authFromHelper(d.config.Auth.Helper),
583	})
584}
585
586// loadImage creates an image by loading it from the file system
587func (d *Driver) loadImage(task *drivers.TaskConfig, driverConfig *TaskConfig, client *docker.Client) (id string, err error) {
588
589	archive := filepath.Join(task.TaskDir().LocalDir, driverConfig.LoadImage)
590	d.logger.Debug("loading image from disk", "archive", archive)
591
592	f, err := os.Open(archive)
593	if err != nil {
594		return "", fmt.Errorf("unable to open image archive: %v", err)
595	}
596
597	if err := client.LoadImage(docker.LoadImageOptions{InputStream: f}); err != nil {
598		return "", err
599	}
600	f.Close()
601
602	dockerImage, err := client.InspectImage(driverConfig.Image)
603	if err != nil {
604		return "", recoverableErrTimeouts(err)
605	}
606
607	d.coordinator.IncrementImageReference(dockerImage.ID, driverConfig.Image, task.ID)
608	return dockerImage.ID, nil
609}
610
611func (d *Driver) containerBinds(task *drivers.TaskConfig, driverConfig *TaskConfig) ([]string, error) {
612
613	allocDirBind := fmt.Sprintf("%s:%s", task.TaskDir().SharedAllocDir, task.Env[taskenv.AllocDir])
614	taskLocalBind := fmt.Sprintf("%s:%s", task.TaskDir().LocalDir, task.Env[taskenv.TaskLocalDir])
615	secretDirBind := fmt.Sprintf("%s:%s", task.TaskDir().SecretsDir, task.Env[taskenv.SecretsDir])
616	binds := []string{allocDirBind, taskLocalBind, secretDirBind}
617
618	taskLocalBindVolume := driverConfig.VolumeDriver == ""
619
620	if !d.config.Volumes.Enabled && !taskLocalBindVolume {
621		return nil, fmt.Errorf("volumes are not enabled; cannot use volume driver %q", driverConfig.VolumeDriver)
622	}
623
624	for _, userbind := range driverConfig.Volumes {
625		// This assumes host OS = docker container OS.
626		// Not true, when we support Linux containers on Windows
627		src, dst, mode, err := parseVolumeSpec(userbind, runtime.GOOS)
628		if err != nil {
629			return nil, fmt.Errorf("invalid docker volume %q: %v", userbind, err)
630		}
631
632		// Paths inside task dir are always allowed when using the default driver,
633		// Relative paths are always allowed as they mount within a container
634		// When a VolumeDriver is set, we assume we receive a binding in the format
635		// volume-name:container-dest
636		// Otherwise, we assume we receive a relative path binding in the format
637		// relative/to/task:/also/in/container
638		if taskLocalBindVolume {
639			src = expandPath(task.TaskDir().Dir, src)
640		} else {
641			// Resolve dotted path segments
642			src = filepath.Clean(src)
643		}
644
645		if !d.config.Volumes.Enabled && !isParentPath(task.AllocDir, src) {
646			return nil, fmt.Errorf("volumes are not enabled; cannot mount host paths: %+q", userbind)
647		}
648
649		bind := src + ":" + dst
650		if mode != "" {
651			bind += ":" + mode
652		}
653		binds = append(binds, bind)
654	}
655
656	if selinuxLabel := d.config.Volumes.SelinuxLabel; selinuxLabel != "" {
657		// Apply SELinux Label to each volume
658		for i := range binds {
659			binds[i] = fmt.Sprintf("%s:%s", binds[i], selinuxLabel)
660		}
661	}
662
663	return binds, nil
664}
665
666var userMountToUnixMount = map[string]string{
667	// Empty string maps to `rprivate` for backwards compatibility in restored
668	// older tasks, where mount propagation will not be present.
669	"":                                     "rprivate",
670	nstructs.VolumeMountPropagationPrivate: "rprivate",
671	nstructs.VolumeMountPropagationHostToTask:    "rslave",
672	nstructs.VolumeMountPropagationBidirectional: "rshared",
673}
674
675func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *TaskConfig,
676	imageID string) (docker.CreateContainerOptions, error) {
677
678	// ensure that PortMap variables are populated early on
679	task.Env = taskenv.SetPortMapEnvs(task.Env, driverConfig.PortMap)
680
681	logger := d.logger.With("task_name", task.Name)
682	var c docker.CreateContainerOptions
683	if task.Resources == nil {
684		// Guard against missing resources. We should never have been able to
685		// schedule a job without specifying this.
686		logger.Error("task.Resources is empty")
687		return c, fmt.Errorf("task.Resources is empty")
688	}
689
690	binds, err := d.containerBinds(task, driverConfig)
691	if err != nil {
692		return c, err
693	}
694	logger.Trace("binding volumes", "volumes", binds)
695
696	// create the config block that will later be consumed by go-dockerclient
697	config := &docker.Config{
698		Image:      imageID,
699		Entrypoint: driverConfig.Entrypoint,
700		Hostname:   driverConfig.Hostname,
701		User:       task.User,
702		Tty:        driverConfig.TTY,
703		OpenStdin:  driverConfig.Interactive,
704	}
705
706	if driverConfig.WorkDir != "" {
707		config.WorkingDir = driverConfig.WorkDir
708	}
709
710	hostConfig := &docker.HostConfig{
711		Memory:    task.Resources.LinuxResources.MemoryLimitBytes,
712		CPUShares: task.Resources.LinuxResources.CPUShares,
713
714		// Binds are used to mount a host volume into the container. We mount a
715		// local directory for storage and a shared alloc directory that can be
716		// used to share data between different tasks in the same task group.
717		Binds: binds,
718
719		StorageOpt:   driverConfig.StorageOpt,
720		VolumeDriver: driverConfig.VolumeDriver,
721
722		PidsLimit: driverConfig.PidsLimit,
723	}
724
725	if _, ok := task.DeviceEnv[nvidiaVisibleDevices]; ok {
726		if !d.gpuRuntime {
727			return c, fmt.Errorf("requested docker-runtime %q was not found", d.config.GPURuntimeName)
728		}
729		hostConfig.Runtime = d.config.GPURuntimeName
730	}
731
732	// Calculate CPU Quota
733	// cfs_quota_us is the time per core, so we must
734	// multiply the time by the number of cores available
735	// See https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/sec-cpu
736	if driverConfig.CPUHardLimit {
737		numCores := runtime.NumCPU()
738		if driverConfig.CPUCFSPeriod < 0 || driverConfig.CPUCFSPeriod > 1000000 {
739			return c, fmt.Errorf("invalid value for cpu_cfs_period")
740		}
741		if driverConfig.CPUCFSPeriod == 0 {
742			driverConfig.CPUCFSPeriod = task.Resources.LinuxResources.CPUPeriod
743		}
744		hostConfig.CPUPeriod = driverConfig.CPUCFSPeriod
745		hostConfig.CPUQuota = int64(task.Resources.LinuxResources.PercentTicks*float64(driverConfig.CPUCFSPeriod)) * int64(numCores)
746	}
747
748	// Windows does not support MemorySwap/MemorySwappiness #2193
749	if runtime.GOOS == "windows" {
750		hostConfig.MemorySwap = 0
751		hostConfig.MemorySwappiness = -1
752	} else {
753		hostConfig.MemorySwap = task.Resources.LinuxResources.MemoryLimitBytes // MemorySwap is memory + swap.
754	}
755
756	loggingDriver := driverConfig.Logging.Type
757	if loggingDriver == "" {
758		loggingDriver = driverConfig.Logging.Driver
759	}
760
761	hostConfig.LogConfig = docker.LogConfig{
762		Type:   loggingDriver,
763		Config: driverConfig.Logging.Config,
764	}
765
766	if hostConfig.LogConfig.Type == "" && hostConfig.LogConfig.Config == nil {
767		logger.Trace("no docker log driver provided, defaulting to json-file")
768		hostConfig.LogConfig.Type = "json-file"
769		hostConfig.LogConfig.Config = map[string]string{
770			"max-file": "2",
771			"max-size": "2m",
772		}
773	}
774
775	logger.Debug("configured resources", "memory", hostConfig.Memory,
776		"cpu_shares", hostConfig.CPUShares, "cpu_quota", hostConfig.CPUQuota,
777		"cpu_period", hostConfig.CPUPeriod)
778	logger.Debug("binding directories", "binds", hclog.Fmt("%#v", hostConfig.Binds))
779
780	//  set privileged mode
781	if driverConfig.Privileged && !d.config.AllowPrivileged {
782		return c, fmt.Errorf(`Docker privileged mode is disabled on this Nomad agent`)
783	}
784	hostConfig.Privileged = driverConfig.Privileged
785
786	// set capabilities
787	hostCapsWhitelistConfig := d.config.AllowCaps
788	hostCapsWhitelist := make(map[string]struct{})
789	for _, cap := range hostCapsWhitelistConfig {
790		cap = strings.ToLower(strings.TrimSpace(cap))
791		hostCapsWhitelist[cap] = struct{}{}
792	}
793
794	if _, ok := hostCapsWhitelist["all"]; !ok {
795		effectiveCaps, err := tweakCapabilities(
796			strings.Split(dockerBasicCaps, ","),
797			driverConfig.CapAdd,
798			driverConfig.CapDrop,
799		)
800		if err != nil {
801			return c, err
802		}
803		var missingCaps []string
804		for _, cap := range effectiveCaps {
805			cap = strings.ToLower(cap)
806			if _, ok := hostCapsWhitelist[cap]; !ok {
807				missingCaps = append(missingCaps, cap)
808			}
809		}
810		if len(missingCaps) > 0 {
811			return c, fmt.Errorf("Docker driver doesn't have the following caps whitelisted on this Nomad agent: %s", missingCaps)
812		}
813	}
814
815	hostConfig.CapAdd = driverConfig.CapAdd
816	hostConfig.CapDrop = driverConfig.CapDrop
817
818	// set SHM size
819	if driverConfig.ShmSize != 0 {
820		hostConfig.ShmSize = driverConfig.ShmSize
821	}
822
823	// set DNS servers
824	for _, ip := range driverConfig.DNSServers {
825		if net.ParseIP(ip) != nil {
826			hostConfig.DNS = append(hostConfig.DNS, ip)
827		} else {
828			logger.Error("invalid ip address for container dns server", "ip", ip)
829		}
830	}
831
832	// Setup devices
833	for _, device := range driverConfig.Devices {
834		dd, err := device.toDockerDevice()
835		if err != nil {
836			return c, err
837		}
838		hostConfig.Devices = append(hostConfig.Devices, dd)
839	}
840	for _, device := range task.Devices {
841		hostConfig.Devices = append(hostConfig.Devices, docker.Device{
842			PathOnHost:        device.HostPath,
843			PathInContainer:   device.TaskPath,
844			CgroupPermissions: device.Permissions,
845		})
846	}
847
848	// Setup mounts
849	for _, m := range driverConfig.Mounts {
850		hm, err := m.toDockerHostMount()
851		if err != nil {
852			return c, err
853		}
854
855		if hm.Type == "bind" {
856			hm.Source = expandPath(task.TaskDir().Dir, hm.Source)
857
858			// paths inside alloc dir are always allowed as they mount within a container, and treated as relative to task dir
859			if !d.config.Volumes.Enabled && !isParentPath(task.AllocDir, hm.Source) {
860				return c, fmt.Errorf("volumes are not enabled; cannot mount host path: %q %q", hm.Source, task.AllocDir)
861			}
862		}
863
864		hostConfig.Mounts = append(hostConfig.Mounts, hm)
865	}
866
867	for _, m := range task.Mounts {
868		hm := docker.HostMount{
869			Type:     "bind",
870			Target:   m.TaskPath,
871			Source:   m.HostPath,
872			ReadOnly: m.Readonly,
873		}
874
875		// MountPropagation is only supported by Docker on Linux:
876		// https://docs.docker.com/storage/bind-mounts/#configure-bind-propagation
877		if runtime.GOOS == "linux" {
878			hm.BindOptions = &docker.BindOptions{
879				Propagation: userMountToUnixMount[m.PropagationMode],
880			}
881		}
882
883		hostConfig.Mounts = append(hostConfig.Mounts, hm)
884	}
885
886	// set DNS search domains and extra hosts
887	hostConfig.DNSSearch = driverConfig.DNSSearchDomains
888	hostConfig.DNSOptions = driverConfig.DNSOptions
889	hostConfig.ExtraHosts = driverConfig.ExtraHosts
890
891	hostConfig.IpcMode = driverConfig.IPCMode
892	hostConfig.PidMode = driverConfig.PidMode
893	hostConfig.UTSMode = driverConfig.UTSMode
894	hostConfig.UsernsMode = driverConfig.UsernsMode
895	hostConfig.SecurityOpt = driverConfig.SecurityOpt
896	hostConfig.Sysctls = driverConfig.Sysctl
897
898	ulimits, err := sliceMergeUlimit(driverConfig.Ulimit)
899	if err != nil {
900		return c, fmt.Errorf("failed to parse ulimit configuration: %v", err)
901	}
902	hostConfig.Ulimits = ulimits
903
904	hostConfig.ReadonlyRootfs = driverConfig.ReadonlyRootfs
905
906	// set the docker network mode
907	hostConfig.NetworkMode = driverConfig.NetworkMode
908
909	// if the driver config does not specify a network mode then try to use the
910	// shared alloc network
911	if hostConfig.NetworkMode == "" {
912		if task.NetworkIsolation != nil && task.NetworkIsolation.Path != "" {
913			// find the previously created parent container to join networks with
914			netMode := fmt.Sprintf("container:%s", task.NetworkIsolation.Labels[dockerNetSpecLabelKey])
915			logger.Debug("configuring network mode for task group", "network_mode", netMode)
916			hostConfig.NetworkMode = netMode
917		} else {
918			// docker default
919			logger.Debug("networking mode not specified; using default")
920			hostConfig.NetworkMode = "default"
921		}
922	}
923
924	// Setup port mapping and exposed ports
925	if len(task.Resources.NomadResources.Networks) == 0 {
926		if len(driverConfig.PortMap) > 0 {
927			return c, fmt.Errorf("Trying to map ports but no network interface is available")
928		}
929	} else {
930		// TODO add support for more than one network
931		network := task.Resources.NomadResources.Networks[0]
932		publishedPorts := map[docker.Port][]docker.PortBinding{}
933		exposedPorts := map[docker.Port]struct{}{}
934
935		for _, port := range network.ReservedPorts {
936			// By default we will map the allocated port 1:1 to the container
937			containerPortInt := port.Value
938
939			// If the user has mapped a port using port_map we'll change it here
940			if mapped, ok := driverConfig.PortMap[port.Label]; ok {
941				containerPortInt = mapped
942			}
943
944			hostPortStr := strconv.Itoa(port.Value)
945			containerPort := docker.Port(strconv.Itoa(containerPortInt))
946
947			publishedPorts[containerPort+"/tcp"] = getPortBinding(network.IP, hostPortStr)
948			publishedPorts[containerPort+"/udp"] = getPortBinding(network.IP, hostPortStr)
949			logger.Debug("allocated static port", "ip", network.IP, "port", port.Value)
950
951			exposedPorts[containerPort+"/tcp"] = struct{}{}
952			exposedPorts[containerPort+"/udp"] = struct{}{}
953			logger.Debug("exposed port", "port", port.Value)
954		}
955
956		for _, port := range network.DynamicPorts {
957			// By default we will map the allocated port 1:1 to the container
958			containerPortInt := port.Value
959
960			// If the user has mapped a port using port_map we'll change it here
961			if mapped, ok := driverConfig.PortMap[port.Label]; ok {
962				containerPortInt = mapped
963			}
964
965			hostPortStr := strconv.Itoa(port.Value)
966			containerPort := docker.Port(strconv.Itoa(containerPortInt))
967
968			publishedPorts[containerPort+"/tcp"] = getPortBinding(network.IP, hostPortStr)
969			publishedPorts[containerPort+"/udp"] = getPortBinding(network.IP, hostPortStr)
970			logger.Debug("allocated mapped port", "ip", network.IP, "port", port.Value)
971
972			exposedPorts[containerPort+"/tcp"] = struct{}{}
973			exposedPorts[containerPort+"/udp"] = struct{}{}
974			logger.Debug("exposed port", "port", containerPort)
975		}
976
977		hostConfig.PortBindings = publishedPorts
978		config.ExposedPorts = exposedPorts
979	}
980
981	// If the user specified a custom command to run, we'll inject it here.
982	if driverConfig.Command != "" {
983		// Validate command
984		if err := validateCommand(driverConfig.Command, "args"); err != nil {
985			return c, err
986		}
987
988		cmd := []string{driverConfig.Command}
989		if len(driverConfig.Args) != 0 {
990			cmd = append(cmd, driverConfig.Args...)
991		}
992		logger.Debug("setting container startup command", "command", strings.Join(cmd, " "))
993		config.Cmd = cmd
994	} else if len(driverConfig.Args) != 0 {
995		config.Cmd = driverConfig.Args
996	}
997
998	if len(driverConfig.Labels) > 0 {
999		config.Labels = driverConfig.Labels
1000	}
1001
1002	labels := make(map[string]string, len(driverConfig.Labels)+1)
1003	for k, v := range driverConfig.Labels {
1004		labels[k] = v
1005	}
1006	labels[dockerLabelAllocID] = task.AllocID
1007	config.Labels = labels
1008	logger.Debug("applied labels on the container", "labels", config.Labels)
1009
1010	config.Env = task.EnvList()
1011
1012	containerName := fmt.Sprintf("%s-%s", strings.Replace(task.Name, "/", "_", -1), task.AllocID)
1013	logger.Debug("setting container name", "container_name", containerName)
1014
1015	var networkingConfig *docker.NetworkingConfig
1016	if len(driverConfig.NetworkAliases) > 0 || driverConfig.IPv4Address != "" || driverConfig.IPv6Address != "" {
1017		networkingConfig = &docker.NetworkingConfig{
1018			EndpointsConfig: map[string]*docker.EndpointConfig{
1019				hostConfig.NetworkMode: {},
1020			},
1021		}
1022	}
1023
1024	if len(driverConfig.NetworkAliases) > 0 {
1025		networkingConfig.EndpointsConfig[hostConfig.NetworkMode].Aliases = driverConfig.NetworkAliases
1026		logger.Debug("setting container network aliases", "network_mode", hostConfig.NetworkMode,
1027			"network_aliases", strings.Join(driverConfig.NetworkAliases, ", "))
1028	}
1029
1030	if driverConfig.IPv4Address != "" || driverConfig.IPv6Address != "" {
1031		networkingConfig.EndpointsConfig[hostConfig.NetworkMode].IPAMConfig = &docker.EndpointIPAMConfig{
1032			IPv4Address: driverConfig.IPv4Address,
1033			IPv6Address: driverConfig.IPv6Address,
1034		}
1035		logger.Debug("setting container network configuration", "network_mode", hostConfig.NetworkMode,
1036			"ipv4_address", driverConfig.IPv4Address, "ipv6_address", driverConfig.IPv6Address)
1037	}
1038
1039	if driverConfig.MacAddress != "" {
1040		config.MacAddress = driverConfig.MacAddress
1041		logger.Debug("setting container mac address", "mac_address", config.MacAddress)
1042	}
1043
1044	return docker.CreateContainerOptions{
1045		Name:             containerName,
1046		Config:           config,
1047		HostConfig:       hostConfig,
1048		NetworkingConfig: networkingConfig,
1049	}, nil
1050}
1051
1052// detectIP of Docker container. Returns the first IP found as well as true if
1053// the IP should be advertised (bridge network IPs return false). Returns an
1054// empty string and false if no IP could be found.
1055func (d *Driver) detectIP(c *docker.Container, driverConfig *TaskConfig) (string, bool) {
1056	if c.NetworkSettings == nil {
1057		// This should only happen if there's been a coding error (such
1058		// as not calling InspectContainer after CreateContainer). Code
1059		// defensively in case the Docker API changes subtly.
1060		d.logger.Error("no network settings for container", "container_id", c.ID)
1061		return "", false
1062	}
1063
1064	ip, ipName := "", ""
1065	auto := false
1066	for name, net := range c.NetworkSettings.Networks {
1067		if net.IPAddress == "" {
1068			// Ignore networks without an IP address
1069			continue
1070		}
1071
1072		ip = net.IPAddress
1073		if driverConfig.AdvertiseIPv6Addr {
1074			ip = net.GlobalIPv6Address
1075			auto = true
1076		}
1077		ipName = name
1078
1079		// Don't auto-advertise IPs for default networks (bridge on
1080		// Linux, nat on Windows)
1081		if name != "bridge" && name != "nat" {
1082			auto = true
1083		}
1084
1085		break
1086	}
1087
1088	if n := len(c.NetworkSettings.Networks); n > 1 {
1089		d.logger.Warn("multiple Docker networks for container found but Nomad only supports 1",
1090			"total_networks", n,
1091			"container_id", c.ID,
1092			"container_network", ipName)
1093	}
1094
1095	return ip, auto
1096}
1097
1098// containerByName finds a running container by name, and returns an error
1099// if the container is dead or can't be found.
1100func (d *Driver) containerByName(name string) (*docker.Container, error) {
1101
1102	client, _, err := d.dockerClients()
1103	if err != nil {
1104		return nil, err
1105	}
1106	containers, err := client.ListContainers(docker.ListContainersOptions{
1107		All: true,
1108	})
1109	if err != nil {
1110		d.logger.Error("failed to query list of containers matching name",
1111			"container_name", name)
1112		return nil, recoverableErrTimeouts(
1113			fmt.Errorf("Failed to query list of containers: %s", err))
1114	}
1115
1116	// container names with a / pre-pended to the Nomad generated container names
1117	containerName := "/" + name
1118	var (
1119		shimContainer docker.APIContainers
1120		found         bool
1121	)
1122OUTER:
1123	for _, shimContainer = range containers {
1124		d.logger.Trace("listed container", "names", hclog.Fmt("%+v", shimContainer.Names))
1125		for _, name := range shimContainer.Names {
1126			if name == containerName {
1127				d.logger.Trace("Found container",
1128					"container_name", containerName, "container_id", shimContainer.ID)
1129				found = true
1130				break OUTER
1131			}
1132		}
1133	}
1134	if !found {
1135		return nil, nil
1136	}
1137
1138	container, err := client.InspectContainer(shimContainer.ID)
1139	if err != nil {
1140		err = fmt.Errorf("Failed to inspect container %s: %s", shimContainer.ID, err)
1141
1142		// This error is always recoverable as it could
1143		// be caused by races between listing
1144		// containers and this container being removed.
1145		// See #2802
1146		return nil, nstructs.NewRecoverableError(err, true)
1147	}
1148	return container, nil
1149}
1150
1151// validateCommand validates that the command only has a single value and
1152// returns a user friendly error message telling them to use the passed
1153// argField.
1154func validateCommand(command, argField string) error {
1155	trimmed := strings.TrimSpace(command)
1156	if len(trimmed) == 0 {
1157		return fmt.Errorf("command empty: %q", command)
1158	}
1159
1160	if len(trimmed) != len(command) {
1161		return fmt.Errorf("command contains extra white space: %q", command)
1162	}
1163
1164	return nil
1165}
1166
1167func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
1168	h, ok := d.tasks.Get(taskID)
1169	if !ok {
1170		return nil, drivers.ErrTaskNotFound
1171	}
1172	ch := make(chan *drivers.ExitResult)
1173	go d.handleWait(ctx, ch, h)
1174	return ch, nil
1175}
1176
1177func (d *Driver) handleWait(ctx context.Context, ch chan *drivers.ExitResult, h *taskHandle) {
1178	defer close(ch)
1179	select {
1180	case <-h.waitCh:
1181		ch <- h.ExitResult()
1182	case <-ctx.Done():
1183		ch <- &drivers.ExitResult{
1184			Err: ctx.Err(),
1185		}
1186	}
1187}
1188
1189func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
1190	h, ok := d.tasks.Get(taskID)
1191	if !ok {
1192		return drivers.ErrTaskNotFound
1193	}
1194
1195	if signal == "" {
1196		signal = "SIGINT"
1197	}
1198
1199	// Windows Docker daemon does not support SIGINT, SIGTERM is the semantic equivalent that
1200	// allows for graceful shutdown before being followed up by a SIGKILL.
1201	// Supported signals:
1202	//   https://github.com/moby/moby/blob/0111ee70874a4947d93f64b672f66a2a35071ee2/pkg/signal/signal_windows.go#L17-L26
1203	if runtime.GOOS == "windows" && signal == "SIGINT" {
1204		signal = "SIGTERM"
1205	}
1206
1207	sig, err := signals.Parse(signal)
1208	if err != nil {
1209		return fmt.Errorf("failed to parse signal: %v", err)
1210	}
1211
1212	return h.Kill(timeout, sig)
1213}
1214
1215func (d *Driver) DestroyTask(taskID string, force bool) error {
1216	h, ok := d.tasks.Get(taskID)
1217	if !ok {
1218		return drivers.ErrTaskNotFound
1219	}
1220
1221	c, err := h.client.InspectContainer(h.containerID)
1222	if err != nil {
1223		switch err.(type) {
1224		case *docker.NoSuchContainer:
1225			h.logger.Info("container was removed out of band, will proceed with DestroyTask",
1226				"error", err)
1227		default:
1228			return fmt.Errorf("failed to inspect container state: %v", err)
1229		}
1230	} else {
1231		if c.State.Running {
1232			if !force {
1233				return fmt.Errorf("must call StopTask for the given task before Destroy or set force to true")
1234			}
1235			if err := h.client.StopContainer(h.containerID, 0); err != nil {
1236				h.logger.Warn("failed to stop container during destroy", "error", err)
1237			}
1238		}
1239
1240		if h.removeContainerOnExit {
1241			if err := h.client.RemoveContainer(docker.RemoveContainerOptions{ID: h.containerID, RemoveVolumes: true, Force: true}); err != nil {
1242				h.logger.Error("error removing container", "error", err)
1243			}
1244		} else {
1245			h.logger.Debug("not removing container due to config")
1246		}
1247	}
1248
1249	if err := d.cleanupImage(h); err != nil {
1250		h.logger.Error("failed to cleanup image after destroying container",
1251			"error", err)
1252	}
1253
1254	d.tasks.Delete(taskID)
1255	return nil
1256}
1257
1258// cleanupImage removes a Docker image. No error is returned if the image
1259// doesn't exist or is still in use. Requires the global client to already be
1260// initialized.
1261func (d *Driver) cleanupImage(handle *taskHandle) error {
1262	if !d.config.GC.Image {
1263		return nil
1264	}
1265
1266	d.coordinator.RemoveImage(handle.containerImage, handle.task.ID)
1267
1268	return nil
1269}
1270
1271func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
1272	h, ok := d.tasks.Get(taskID)
1273	if !ok {
1274		return nil, drivers.ErrTaskNotFound
1275	}
1276
1277	container, err := client.InspectContainer(h.containerID)
1278	if err != nil {
1279		return nil, fmt.Errorf("failed to inspect container %q: %v", h.containerID, err)
1280	}
1281	status := &drivers.TaskStatus{
1282		ID:          h.task.ID,
1283		Name:        h.task.Name,
1284		StartedAt:   container.State.StartedAt,
1285		CompletedAt: container.State.FinishedAt,
1286		DriverAttributes: map[string]string{
1287			"container_id": container.ID,
1288		},
1289		NetworkOverride: h.net,
1290		ExitResult:      h.ExitResult(),
1291	}
1292
1293	status.State = drivers.TaskStateUnknown
1294	if container.State.Running {
1295		status.State = drivers.TaskStateRunning
1296	}
1297	if container.State.Dead {
1298		status.State = drivers.TaskStateExited
1299	}
1300
1301	return status, nil
1302}
1303
1304func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
1305	h, ok := d.tasks.Get(taskID)
1306	if !ok {
1307		return nil, drivers.ErrTaskNotFound
1308	}
1309
1310	return h.Stats(ctx, interval)
1311}
1312
1313func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
1314	return d.eventer.TaskEvents(ctx)
1315}
1316
1317func (d *Driver) SignalTask(taskID string, signal string) error {
1318	h, ok := d.tasks.Get(taskID)
1319	if !ok {
1320		return drivers.ErrTaskNotFound
1321	}
1322
1323	sig, err := signals.Parse(signal)
1324	if err != nil {
1325		return fmt.Errorf("failed to parse signal: %v", err)
1326	}
1327
1328	return h.Signal(sig)
1329}
1330
1331func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
1332	h, ok := d.tasks.Get(taskID)
1333	if !ok {
1334		return nil, drivers.ErrTaskNotFound
1335	}
1336
1337	if len(cmd) == 0 {
1338		return nil, fmt.Errorf("cmd is required, but was empty")
1339	}
1340
1341	ctx, cancel := context.WithTimeout(context.Background(), timeout)
1342	defer cancel()
1343
1344	return h.Exec(ctx, cmd[0], cmd[1:])
1345}
1346
1347var _ drivers.ExecTaskStreamingDriver = (*Driver)(nil)
1348
1349func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *drivers.ExecOptions) (*drivers.ExitResult, error) {
1350	defer opts.Stdout.Close()
1351	defer opts.Stderr.Close()
1352
1353	done := make(chan interface{})
1354	defer close(done)
1355
1356	h, ok := d.tasks.Get(taskID)
1357	if !ok {
1358		return nil, drivers.ErrTaskNotFound
1359	}
1360
1361	if len(opts.Command) == 0 {
1362		return nil, fmt.Errorf("command is required but was empty")
1363	}
1364
1365	createExecOpts := docker.CreateExecOptions{
1366		AttachStdin:  true,
1367		AttachStdout: true,
1368		AttachStderr: true,
1369		Tty:          opts.Tty,
1370		Cmd:          opts.Command,
1371		Container:    h.containerID,
1372		Context:      ctx,
1373	}
1374	exec, err := h.client.CreateExec(createExecOpts)
1375	if err != nil {
1376		return nil, fmt.Errorf("failed to create exec object: %v", err)
1377	}
1378
1379	go func() {
1380		for {
1381			select {
1382			case <-ctx.Done():
1383				return
1384			case <-done:
1385				return
1386			case s, ok := <-opts.ResizeCh:
1387				if !ok {
1388					return
1389				}
1390				client.ResizeExecTTY(exec.ID, s.Height, s.Width)
1391			}
1392		}
1393	}()
1394
1395	startOpts := docker.StartExecOptions{
1396		Detach: false,
1397
1398		// When running in TTY, we must use a raw terminal.
1399		// If not, we set RawTerminal to false to allow docker client
1400		// to interpret special stdout/stderr messages
1401		Tty:         opts.Tty,
1402		RawTerminal: opts.Tty,
1403
1404		InputStream:  opts.Stdin,
1405		OutputStream: opts.Stdout,
1406		ErrorStream:  opts.Stderr,
1407		Context:      ctx,
1408	}
1409	if err := client.StartExec(exec.ID, startOpts); err != nil {
1410		return nil, fmt.Errorf("failed to start exec: %v", err)
1411	}
1412
1413	// StartExec returns after process completes, but InspectExec seems to have a delay
1414	// get in getting status code
1415
1416	const execTerminatingTimeout = 3 * time.Second
1417	start := time.Now()
1418	var res *docker.ExecInspect
1419	for (res == nil || res.Running) && time.Since(start) <= execTerminatingTimeout {
1420		res, err = client.InspectExec(exec.ID)
1421		if err != nil {
1422			return nil, fmt.Errorf("failed to inspect exec result: %v", err)
1423		}
1424		time.Sleep(50 * time.Millisecond)
1425	}
1426
1427	if res == nil || res.Running {
1428		return nil, fmt.Errorf("failed to retrieve exec result")
1429	}
1430
1431	return &drivers.ExitResult{
1432		ExitCode: res.ExitCode,
1433	}, nil
1434}
1435
1436// dockerClients creates two *docker.Client, one for long running operations and
1437// the other for shorter operations. In test / dev mode we can use ENV vars to
1438// connect to the docker daemon. In production mode we will read docker.endpoint
1439// from the config file.
1440func (d *Driver) dockerClients() (*docker.Client, *docker.Client, error) {
1441	createClientsLock.Lock()
1442	defer createClientsLock.Unlock()
1443
1444	if client != nil && waitClient != nil {
1445		return client, waitClient, nil
1446	}
1447
1448	var err error
1449
1450	// Onlt initialize the client if it hasn't yet been done
1451	if client == nil {
1452		client, err = d.newDockerClient(dockerTimeout)
1453		if err != nil {
1454			return nil, nil, err
1455		}
1456	}
1457
1458	// Only initialize the waitClient if it hasn't yet been done
1459	if waitClient == nil {
1460		waitClient, err = d.newDockerClient(0 * time.Minute)
1461		if err != nil {
1462			return nil, nil, err
1463		}
1464	}
1465
1466	return client, waitClient, nil
1467}
1468
1469// newDockerClient creates a new *docker.Client with a configurable timeout
1470func (d *Driver) newDockerClient(timeout time.Duration) (*docker.Client, error) {
1471	var err error
1472	var merr multierror.Error
1473	var newClient *docker.Client
1474
1475	// Default to using whatever is configured in docker.endpoint. If this is
1476	// not specified we'll fall back on NewClientFromEnv which reads config from
1477	// the DOCKER_* environment variables DOCKER_HOST, DOCKER_TLS_VERIFY, and
1478	// DOCKER_CERT_PATH. This allows us to lock down the config in production
1479	// but also accept the standard ENV configs for dev and test.
1480	dockerEndpoint := d.config.Endpoint
1481	if dockerEndpoint != "" {
1482		cert := d.config.TLS.Cert
1483		key := d.config.TLS.Key
1484		ca := d.config.TLS.CA
1485
1486		if cert+key+ca != "" {
1487			d.logger.Debug("using TLS client connection", "endpoint", dockerEndpoint)
1488			newClient, err = docker.NewTLSClient(dockerEndpoint, cert, key, ca)
1489			if err != nil {
1490				merr.Errors = append(merr.Errors, err)
1491			}
1492		} else {
1493			d.logger.Debug("using standard client connection", "endpoint", dockerEndpoint)
1494			newClient, err = docker.NewClient(dockerEndpoint)
1495			if err != nil {
1496				merr.Errors = append(merr.Errors, err)
1497			}
1498		}
1499	} else {
1500		d.logger.Debug("using client connection initialized from environment")
1501		newClient, err = docker.NewClientFromEnv()
1502		if err != nil {
1503			merr.Errors = append(merr.Errors, err)
1504		}
1505	}
1506
1507	if timeout != 0 && newClient != nil {
1508		newClient.SetTimeout(timeout)
1509	}
1510	return newClient, merr.ErrorOrNil()
1511}
1512
1513func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) {
1514	var ulimits []docker.ULimit
1515
1516	for name, ulimitRaw := range ulimitsRaw {
1517		if len(ulimitRaw) == 0 {
1518			return []docker.ULimit{}, fmt.Errorf("Malformed ulimit specification %v: %q, cannot be empty", name, ulimitRaw)
1519		}
1520		// hard limit is optional
1521		if strings.Contains(ulimitRaw, ":") == false {
1522			ulimitRaw = ulimitRaw + ":" + ulimitRaw
1523		}
1524
1525		splitted := strings.SplitN(ulimitRaw, ":", 2)
1526		if len(splitted) < 2 {
1527			return []docker.ULimit{}, fmt.Errorf("Malformed ulimit specification %v: %v", name, ulimitRaw)
1528		}
1529		soft, err := strconv.Atoi(splitted[0])
1530		if err != nil {
1531			return []docker.ULimit{}, fmt.Errorf("Malformed soft ulimit %v: %v", name, ulimitRaw)
1532		}
1533		hard, err := strconv.Atoi(splitted[1])
1534		if err != nil {
1535			return []docker.ULimit{}, fmt.Errorf("Malformed hard ulimit %v: %v", name, ulimitRaw)
1536		}
1537
1538		ulimit := docker.ULimit{
1539			Name: name,
1540			Soft: int64(soft),
1541			Hard: int64(hard),
1542		}
1543		ulimits = append(ulimits, ulimit)
1544	}
1545	return ulimits, nil
1546}
1547
1548func (d *Driver) Shutdown() {
1549	d.signalShutdown()
1550}
1551
1552func isDockerTransientError(err error) bool {
1553	if err == nil {
1554		return false
1555	}
1556
1557	errMsg := err.Error()
1558	for _, te := range dockerTransientErrs {
1559		if strings.Contains(errMsg, te) {
1560			return true
1561		}
1562	}
1563
1564	return false
1565}
1566