1package rkt
2
3import (
4	"bytes"
5	"context"
6	"encoding/json"
7	"fmt"
8	"io/ioutil"
9	"math/rand"
10	"net"
11	"os"
12	"os/exec"
13	"path/filepath"
14	"regexp"
15	"strconv"
16	"strings"
17	"sync"
18	"syscall"
19	"time"
20
21	appcschema "github.com/appc/spec/schema"
22	"github.com/hashicorp/consul-template/signals"
23	hclog "github.com/hashicorp/go-hclog"
24	version "github.com/hashicorp/go-version"
25	"github.com/hashicorp/nomad/client/config"
26	"github.com/hashicorp/nomad/client/taskenv"
27	"github.com/hashicorp/nomad/drivers/shared/eventer"
28	"github.com/hashicorp/nomad/drivers/shared/executor"
29	"github.com/hashicorp/nomad/helper"
30	"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
31	"github.com/hashicorp/nomad/helper/pluginutils/loader"
32	"github.com/hashicorp/nomad/plugins/base"
33	"github.com/hashicorp/nomad/plugins/drivers"
34	"github.com/hashicorp/nomad/plugins/shared/hclspec"
35	pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
36)
37
38const (
39	// pluginName is the name of the plugin
40	pluginName = "rkt"
41
42	// fingerprintPeriod is the interval at which the driver will send fingerprint responses
43	fingerprintPeriod = 30 * time.Second
44
45	// minRktVersion is the earliest supported version of rkt. rkt added support
46	// for CPU and memory isolators in 0.14.0. We cannot support an earlier
47	// version to maintain an uniform interface across all drivers
48	minRktVersion = "1.27.0"
49
50	// rktCmd is the command rkt is installed as.
51	rktCmd = "rkt"
52
53	// networkDeadline is how long to wait for container network
54	// information to become available.
55	networkDeadline = 1 * time.Minute
56
57	// taskHandleVersion is the version of task handle which this driver sets
58	// and understands how to decode driver state
59	taskHandleVersion = 1
60)
61
62var (
63	// PluginID is the rawexec plugin metadata registered in the plugin
64	// catalog.
65	PluginID = loader.PluginID{
66		Name:       pluginName,
67		PluginType: base.PluginTypeDriver,
68	}
69
70	// PluginConfig is the rawexec factory function registered in the
71	// plugin catalog.
72	PluginConfig = &loader.InternalPluginConfig{
73		Config:  map[string]interface{}{},
74		Factory: func(l hclog.Logger) interface{} { return NewRktDriver(l) },
75	}
76)
77
78// PluginLoader maps pre-0.9 client driver options to post-0.9 plugin options.
79func PluginLoader(opts map[string]string) (map[string]interface{}, error) {
80	conf := map[string]interface{}{}
81	if v, err := strconv.ParseBool(opts["driver.rkt.volumes.enabled"]); err == nil {
82		conf["volumes_enabled"] = v
83	}
84	return conf, nil
85}
86
87var (
88	// pluginInfo is the response returned for the PluginInfo RPC
89	pluginInfo = &base.PluginInfoResponse{
90		Type:              base.PluginTypeDriver,
91		PluginApiVersions: []string{drivers.ApiVersion010},
92		PluginVersion:     "0.1.0",
93		Name:              pluginName,
94	}
95
96	// configSpec is the hcl specification returned by the ConfigSchema RPC
97	configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
98		"volumes_enabled": hclspec.NewDefault(
99			hclspec.NewAttr("volumes_enabled", "bool", false),
100			hclspec.NewLiteral("true"),
101		),
102	})
103
104	// taskConfigSpec is the hcl specification for the driver config section of
105	// a taskConfig within a job. It is returned in the TaskConfigSchema RPC
106	taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
107		"image":              hclspec.NewAttr("image", "string", true),
108		"command":            hclspec.NewAttr("command", "string", false),
109		"args":               hclspec.NewAttr("args", "list(string)", false),
110		"trust_prefix":       hclspec.NewAttr("trust_prefix", "string", false),
111		"dns_servers":        hclspec.NewAttr("dns_servers", "list(string)", false),
112		"dns_search_domains": hclspec.NewAttr("dns_search_domains", "list(string)", false),
113		"net":                hclspec.NewAttr("net", "list(string)", false),
114		"port_map":           hclspec.NewAttr("port_map", "list(map(string))", false),
115		"volumes":            hclspec.NewAttr("volumes", "list(string)", false),
116		"insecure_options":   hclspec.NewAttr("insecure_options", "list(string)", false),
117		"no_overlay":         hclspec.NewAttr("no_overlay", "bool", false),
118		"debug":              hclspec.NewAttr("debug", "bool", false),
119		"group":              hclspec.NewAttr("group", "string", false),
120	})
121
122	// capabilities is returned by the Capabilities RPC and indicates what
123	// optional features this driver supports
124	capabilities = &drivers.Capabilities{
125		SendSignals: true,
126		Exec:        true,
127		FSIsolation: drivers.FSIsolationImage,
128	}
129
130	reRktVersion  = regexp.MustCompile(`rkt [vV]ersion[:]? (\d[.\d]+)`)
131	reAppcVersion = regexp.MustCompile(`appc [vV]ersion[:]? (\d[.\d]+)`)
132)
133
134// Config is the client configuration for the driver
135type Config struct {
136	// VolumesEnabled allows tasks to bind host paths (volumes) inside their
137	// container. Binding relative paths is always allowed and will be resolved
138	// relative to the allocation's directory.
139	VolumesEnabled bool `codec:"volumes_enabled"`
140}
141
142// TaskConfig is the driver configuration of a taskConfig within a job
143type TaskConfig struct {
144	ImageName        string             `codec:"image"`
145	Command          string             `codec:"command"`
146	Args             []string           `codec:"args"`
147	TrustPrefix      string             `codec:"trust_prefix"`
148	DNSServers       []string           `codec:"dns_servers"`        // DNS Server for containers
149	DNSSearchDomains []string           `codec:"dns_search_domains"` // DNS Search domains for containers
150	Net              []string           `codec:"net"`                // Networks for the containers
151	PortMap          hclutils.MapStrStr `codec:"port_map"`           // A map of host port and the port name defined in the image manifest file
152	Volumes          []string           `codec:"volumes"`            // Host-Volumes to mount in, syntax: /path/to/host/directory:/destination/path/in/container[:readOnly]
153	InsecureOptions  []string           `codec:"insecure_options"`   // list of args for --insecure-options
154
155	NoOverlay bool   `codec:"no_overlay"` // disable overlayfs for rkt run
156	Debug     bool   `codec:"debug"`      // Enable debug option for rkt command
157	Group     string `codec:"group"`      // Group override for the container
158}
159
160// TaskState is the state which is encoded in the handle returned in
161// StartTask. This information is needed to rebuild the taskConfig state and handler
162// during recovery.
163type TaskState struct {
164	ReattachConfig *pstructs.ReattachConfig
165	TaskConfig     *drivers.TaskConfig
166	Pid            int
167	StartedAt      time.Time
168	UUID           string
169}
170
171// Driver is a driver for running images via Rkt We attempt to chose sane
172// defaults for now, with more configuration available planned in the future.
173type Driver struct {
174	// eventer is used to handle multiplexing of TaskEvents calls such that an
175	// event can be broadcast to all callers
176	eventer *eventer.Eventer
177
178	// config is the driver configuration set by the SetConfig RPC
179	config *Config
180
181	// nomadConfig is the client config from nomad
182	nomadConfig *base.ClientDriverConfig
183
184	// tasks is the in memory datastore mapping taskIDs to rktTaskHandles
185	tasks *taskStore
186
187	// ctx is the context for the driver. It is passed to other subsystems to
188	// coordinate shutdown
189	ctx context.Context
190
191	// signalShutdown is called when the driver is shutting down and cancels the
192	// ctx passed to any subsystems
193	signalShutdown context.CancelFunc
194
195	// logger will log to the Nomad agent
196	logger hclog.Logger
197
198	// A tri-state boolean to know if the fingerprinting has happened and
199	// whether it has been successful
200	fingerprintSuccess *bool
201	fingerprintLock    sync.Mutex
202}
203
204func NewRktDriver(logger hclog.Logger) drivers.DriverPlugin {
205	ctx, cancel := context.WithCancel(context.Background())
206	logger = logger.Named(pluginName)
207	return &Driver{
208		eventer:        eventer.NewEventer(ctx, logger),
209		config:         &Config{},
210		tasks:          newTaskStore(),
211		ctx:            ctx,
212		signalShutdown: cancel,
213		logger:         logger,
214	}
215}
216
217func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
218	return pluginInfo, nil
219}
220
221func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
222	return configSpec, nil
223}
224
225func (d *Driver) SetConfig(cfg *base.Config) error {
226	var config Config
227	if len(cfg.PluginConfig) != 0 {
228		if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil {
229			return err
230		}
231	}
232
233	d.config = &config
234	if cfg.AgentConfig != nil {
235		d.nomadConfig = cfg.AgentConfig.Driver
236	}
237	return nil
238}
239
240func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
241	return taskConfigSpec, nil
242}
243
244func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
245	return capabilities, nil
246}
247
248func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
249	ch := make(chan *drivers.Fingerprint)
250	go d.handleFingerprint(ctx, ch)
251	return ch, nil
252}
253
254func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) {
255	defer close(ch)
256	ticker := time.NewTimer(0)
257	for {
258		select {
259		case <-ctx.Done():
260			return
261		case <-d.ctx.Done():
262			return
263		case <-ticker.C:
264			ticker.Reset(fingerprintPeriod)
265			ch <- d.buildFingerprint()
266		}
267	}
268}
269
270// setFingerprintSuccess marks the driver as having fingerprinted successfully
271func (d *Driver) setFingerprintSuccess() {
272	d.fingerprintLock.Lock()
273	d.fingerprintSuccess = helper.BoolToPtr(true)
274	d.fingerprintLock.Unlock()
275}
276
277// setFingerprintFailure marks the driver as having failed fingerprinting
278func (d *Driver) setFingerprintFailure() {
279	d.fingerprintLock.Lock()
280	d.fingerprintSuccess = helper.BoolToPtr(false)
281	d.fingerprintLock.Unlock()
282}
283
284// fingerprintSuccessful returns true if the driver has
285// never fingerprinted or has successfully fingerprinted
286func (d *Driver) fingerprintSuccessful() bool {
287	d.fingerprintLock.Lock()
288	defer d.fingerprintLock.Unlock()
289	return d.fingerprintSuccess == nil || *d.fingerprintSuccess
290}
291
292func (d *Driver) buildFingerprint() *drivers.Fingerprint {
293	fingerprint := &drivers.Fingerprint{
294		Attributes:        map[string]*pstructs.Attribute{},
295		Health:            drivers.HealthStateHealthy,
296		HealthDescription: drivers.DriverHealthy,
297	}
298
299	// Only enable if we are root
300	if syscall.Geteuid() != 0 {
301		if d.fingerprintSuccessful() {
302			d.logger.Debug("must run as root user, disabling")
303		}
304		d.setFingerprintFailure()
305		fingerprint.Health = drivers.HealthStateUndetected
306		fingerprint.HealthDescription = drivers.DriverRequiresRootMessage
307		return fingerprint
308	}
309
310	outBytes, err := exec.Command(rktCmd, "version").Output()
311	if err != nil {
312		fingerprint.Health = drivers.HealthStateUndetected
313		fingerprint.HealthDescription = fmt.Sprintf("Failed to execute %s version: %v", rktCmd, err)
314		d.setFingerprintFailure()
315		return fingerprint
316	}
317	out := strings.TrimSpace(string(outBytes))
318
319	rktMatches := reRktVersion.FindStringSubmatch(out)
320	appcMatches := reAppcVersion.FindStringSubmatch(out)
321	if len(rktMatches) != 2 || len(appcMatches) != 2 {
322		fingerprint.Health = drivers.HealthStateUndetected
323		fingerprint.HealthDescription = "Unable to parse rkt version string"
324		d.setFingerprintFailure()
325		return fingerprint
326	}
327
328	minVersion, _ := version.NewVersion(minRktVersion)
329	currentVersion, _ := version.NewVersion(rktMatches[1])
330	if currentVersion.LessThan(minVersion) {
331		// Do not allow ancient rkt versions
332		fingerprint.Health = drivers.HealthStateUndetected
333		fingerprint.HealthDescription = fmt.Sprintf("Unsuported rkt version %s", currentVersion)
334		if d.fingerprintSuccessful() {
335			d.logger.Warn("unsupported rkt version please upgrade to >= "+minVersion.String(),
336				"rkt_version", currentVersion)
337		}
338		d.setFingerprintFailure()
339		return fingerprint
340	}
341
342	fingerprint.Attributes["driver.rkt"] = pstructs.NewBoolAttribute(true)
343	fingerprint.Attributes["driver.rkt.version"] = pstructs.NewStringAttribute(rktMatches[1])
344	fingerprint.Attributes["driver.rkt.appc.version"] = pstructs.NewStringAttribute(appcMatches[1])
345	if d.config.VolumesEnabled {
346		fingerprint.Attributes["driver.rkt.volumes.enabled"] = pstructs.NewBoolAttribute(true)
347	}
348	d.setFingerprintSuccess()
349	return fingerprint
350
351}
352
353func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
354	if handle == nil {
355		return fmt.Errorf("error: handle cannot be nil")
356	}
357
358	// COMPAT(0.10): pre 0.9 upgrade path check
359	if handle.Version == 0 {
360		return d.recoverPre09Task(handle)
361	}
362
363	// If already attached to handle there's nothing to recover.
364	if _, ok := d.tasks.Get(handle.Config.ID); ok {
365		d.logger.Trace("nothing to recover; task already exists",
366			"task_id", handle.Config.ID,
367			"task_name", handle.Config.Name,
368		)
369		return nil
370	}
371
372	var taskState TaskState
373	if err := handle.GetDriverState(&taskState); err != nil {
374		d.logger.Error("failed to decode taskConfig state from handle", "error", err, "task_id", handle.Config.ID)
375		return fmt.Errorf("failed to decode taskConfig state from handle: %v", err)
376	}
377
378	plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig)
379	if err != nil {
380		d.logger.Error("failed to build ReattachConfig from taskConfig state", "error", err, "task_id", handle.Config.ID)
381		return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err)
382	}
383
384	execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC,
385		d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
386	if err != nil {
387		d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
388		return fmt.Errorf("failed to reattach to executor: %v", err)
389	}
390
391	// The taskConfig's environment is set via --set-env flags in Start, but the rkt
392	// command itself needs an environment with PATH set to find iptables.
393	// TODO (preetha) need to figure out how to read env.blacklist
394	eb := taskenv.NewEmptyBuilder()
395	filter := strings.Split(config.DefaultEnvBlacklist, ",")
396	rktEnv := eb.SetHostEnvvars(filter).Build()
397
398	h := &taskHandle{
399		exec:         execImpl,
400		env:          rktEnv,
401		pid:          taskState.Pid,
402		uuid:         taskState.UUID,
403		pluginClient: pluginClient,
404		taskConfig:   taskState.TaskConfig,
405		procState:    drivers.TaskStateRunning,
406		startedAt:    taskState.StartedAt,
407		exitResult:   &drivers.ExitResult{},
408		logger:       d.logger,
409	}
410
411	d.tasks.Set(taskState.TaskConfig.ID, h)
412
413	go h.run()
414	return nil
415}
416
417func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
418	if _, ok := d.tasks.Get(cfg.ID); ok {
419		return nil, nil, fmt.Errorf("taskConfig with ID '%s' already started", cfg.ID)
420	}
421
422	var driverConfig TaskConfig
423
424	if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
425		return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
426	}
427
428	handle := drivers.NewTaskHandle(taskHandleVersion)
429	handle.Config = cfg
430
431	// todo(preetha) - port map in client v1 is a slice of maps that get merged. figure out if the caller will do this
432	//driverConfig.PortMap
433
434	// ACI image
435	img := driverConfig.ImageName
436
437	// Global arguments given to both prepare and run-prepared
438	globalArgs := make([]string, 0, 50)
439
440	// Add debug option to rkt command.
441	debug := driverConfig.Debug
442
443	// Add the given trust prefix
444	trustPrefix := driverConfig.TrustPrefix
445	insecure := false
446	if trustPrefix != "" {
447		var outBuf, errBuf bytes.Buffer
448		cmd := exec.Command(rktCmd, "trust", "--skip-fingerprint-review=true", fmt.Sprintf("--prefix=%s", trustPrefix), fmt.Sprintf("--debug=%t", debug))
449		cmd.Stdout = &outBuf
450		cmd.Stderr = &errBuf
451		if err := cmd.Run(); err != nil {
452			return nil, nil, fmt.Errorf("Error running rkt trust: %s\n\nOutput: %s\n\nError: %s",
453				err, outBuf.String(), errBuf.String())
454		}
455		d.logger.Debug("added trust prefix", "trust_prefix", trustPrefix, "task_name", cfg.Name)
456	} else {
457		// Disable signature verification if the trust command was not run.
458		insecure = true
459	}
460
461	// if we have a selective insecure_options, prefer them
462	// insecure options are rkt's global argument, so we do this before the actual "run"
463	if len(driverConfig.InsecureOptions) > 0 {
464		globalArgs = append(globalArgs, fmt.Sprintf("--insecure-options=%s", strings.Join(driverConfig.InsecureOptions, ",")))
465	} else if insecure {
466		globalArgs = append(globalArgs, "--insecure-options=all")
467	}
468
469	// debug is rkt's global argument, so add it before the actual "run"
470	globalArgs = append(globalArgs, fmt.Sprintf("--debug=%t", debug))
471
472	prepareArgs := make([]string, 0, 50)
473	runArgs := make([]string, 0, 50)
474
475	prepareArgs = append(prepareArgs, globalArgs...)
476	prepareArgs = append(prepareArgs, "prepare")
477	runArgs = append(runArgs, globalArgs...)
478	runArgs = append(runArgs, "run-prepared")
479
480	// disable overlayfs
481	if driverConfig.NoOverlay {
482		prepareArgs = append(prepareArgs, "--no-overlay=true")
483	}
484
485	// Convert underscores to dashes in taskConfig names for use in volume names #2358
486	sanitizedName := strings.Replace(cfg.Name, "_", "-", -1)
487
488	// Mount /alloc
489	allocVolName := fmt.Sprintf("%s-%s-alloc", cfg.AllocID, sanitizedName)
490	prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", allocVolName, cfg.TaskDir().SharedAllocDir))
491	prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", allocVolName, "/alloc"))
492
493	// Mount /local
494	localVolName := fmt.Sprintf("%s-%s-local", cfg.AllocID, sanitizedName)
495	prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", localVolName, cfg.TaskDir().LocalDir))
496	prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", localVolName, "/local"))
497
498	// Mount /secrets
499	secretsVolName := fmt.Sprintf("%s-%s-secrets", cfg.AllocID, sanitizedName)
500	prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", secretsVolName, cfg.TaskDir().SecretsDir))
501	prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", secretsVolName, "/secrets"))
502
503	// Mount arbitrary volumes if enabled
504	if len(driverConfig.Volumes) > 0 {
505		if !d.config.VolumesEnabled {
506			return nil, nil, fmt.Errorf("volumes_enabled is false; cannot use rkt volumes: %+q", driverConfig.Volumes)
507		}
508		for i, rawvol := range driverConfig.Volumes {
509			parts := strings.Split(rawvol, ":")
510			readOnly := "false"
511			// job spec:
512			//   volumes = ["/host/path:/container/path[:readOnly]"]
513			// the third parameter is optional, mount is read-write by default
514			if len(parts) == 3 {
515				if parts[2] == "readOnly" {
516					d.logger.Debug("mounting volume as readOnly", "volume", strings.Join(parts[:2], parts[1]))
517					readOnly = "true"
518				} else {
519					d.logger.Warn("unknown volume parameter ignored for mount", "parameter", parts[2], "mount", parts[0])
520				}
521			} else if len(parts) != 2 {
522				return nil, nil, fmt.Errorf("invalid rkt volume: %q", rawvol)
523			}
524			volName := fmt.Sprintf("%s-%s-%d", cfg.AllocID, sanitizedName, i)
525			prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%s", volName, parts[0], readOnly))
526			prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, parts[1]))
527		}
528	}
529
530	// Mount task volumes, always do
531	for i, vol := range cfg.Mounts {
532		volName := fmt.Sprintf("%s-%s-taskmounts-%d", cfg.AllocID, sanitizedName, i)
533		prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%v", volName, vol.HostPath, vol.Readonly))
534		prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, vol.TaskPath))
535	}
536
537	// Mount task devices, always do
538	for i, vol := range cfg.Devices {
539		volName := fmt.Sprintf("%s-%s-taskdevices-%d", cfg.AllocID, sanitizedName, i)
540		readOnly := !strings.Contains(vol.Permissions, "w")
541		prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%v", volName, vol.HostPath, readOnly))
542		prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, vol.TaskPath))
543	}
544
545	// Inject environment variables
546	for k, v := range cfg.Env {
547		prepareArgs = append(prepareArgs, fmt.Sprintf("--set-env=%s=%s", k, v))
548	}
549
550	// Image is set here, because the commands that follow apply to it
551	prepareArgs = append(prepareArgs, img)
552
553	// Check if the user has overridden the exec command.
554	if driverConfig.Command != "" {
555		prepareArgs = append(prepareArgs, fmt.Sprintf("--exec=%v", driverConfig.Command))
556	}
557
558	// Add memory isolator
559	prepareArgs = append(prepareArgs, fmt.Sprintf("--memory=%v", cfg.Resources.LinuxResources.MemoryLimitBytes))
560
561	// Add CPU isolator
562	prepareArgs = append(prepareArgs, fmt.Sprintf("--cpu=%v", cfg.Resources.LinuxResources.CPUShares))
563
564	// Add DNS servers
565	if len(driverConfig.DNSServers) == 1 && (driverConfig.DNSServers[0] == "host" || driverConfig.DNSServers[0] == "none") {
566		// Special case single item lists with the special values "host" or "none"
567		runArgs = append(runArgs, fmt.Sprintf("--dns=%s", driverConfig.DNSServers[0]))
568	} else {
569		for _, ip := range driverConfig.DNSServers {
570			if err := net.ParseIP(ip); err == nil {
571				wrappedErr := fmt.Errorf("invalid ip address for container dns server %q", ip)
572				d.logger.Debug("error parsing DNS server", "error", wrappedErr)
573				return nil, nil, wrappedErr
574			}
575			runArgs = append(runArgs, fmt.Sprintf("--dns=%s", ip))
576		}
577	}
578
579	// set DNS search domains
580	for _, domain := range driverConfig.DNSSearchDomains {
581		runArgs = append(runArgs, fmt.Sprintf("--dns-search=%s", domain))
582	}
583
584	// set network
585	network := strings.Join(driverConfig.Net, ",")
586	if network != "" {
587		runArgs = append(runArgs, fmt.Sprintf("--net=%s", network))
588	}
589
590	// Setup port mapping and exposed ports
591	if len(cfg.Resources.NomadResources.Networks) == 0 {
592		d.logger.Debug("no network interfaces are available")
593		if len(driverConfig.PortMap) > 0 {
594			return nil, nil, fmt.Errorf("Trying to map ports but no network interface is available")
595		}
596	} else if network == "host" {
597		// Port mapping is skipped when host networking is used.
598		d.logger.Debug("Ignoring port_map when using --net=host", "task_name", cfg.Name)
599	} else {
600		network := cfg.Resources.NomadResources.Networks[0]
601		for _, port := range network.ReservedPorts {
602			var containerPort string
603
604			mapped, ok := driverConfig.PortMap[port.Label]
605			if !ok {
606				// If the user doesn't have a mapped port using port_map, driver stops running container.
607				return nil, nil, fmt.Errorf("port_map is not set. When you defined port in the resources, you need to configure port_map.")
608			}
609			containerPort = mapped
610
611			hostPortStr := strconv.Itoa(port.Value)
612
613			d.logger.Debug("driver.rkt: exposed port", "containerPort", containerPort)
614			// Add port option to rkt run arguments. rkt allows multiple port args
615			prepareArgs = append(prepareArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
616		}
617
618		for _, port := range network.DynamicPorts {
619			// By default we will map the allocated port 1:1 to the container
620			var containerPort string
621
622			if mapped, ok := driverConfig.PortMap[port.Label]; ok {
623				containerPort = mapped
624			} else {
625				// If the user doesn't have mapped a port using port_map, driver stops running container.
626				return nil, nil, fmt.Errorf("port_map is not set. When you defined port in the resources, you need to configure port_map.")
627			}
628
629			hostPortStr := strconv.Itoa(port.Value)
630
631			d.logger.Debug("exposed port", "containerPort", containerPort, "task_name", cfg.Name)
632			// Add port option to rkt run arguments. rkt allows multiple port args
633			prepareArgs = append(prepareArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr))
634		}
635
636	}
637
638	// If a user has been specified for the taskConfig, pass it through to the user
639	if cfg.User != "" {
640		prepareArgs = append(prepareArgs, fmt.Sprintf("--user=%s", cfg.User))
641	}
642
643	// There's no taskConfig-level parameter for groups so check the driver
644	// config for a custom group
645	if driverConfig.Group != "" {
646		prepareArgs = append(prepareArgs, fmt.Sprintf("--group=%s", driverConfig.Group))
647	}
648
649	// Add user passed arguments.
650	if len(driverConfig.Args) != 0 {
651
652		// Need to start arguments with "--"
653		prepareArgs = append(prepareArgs, "--")
654
655		for _, arg := range driverConfig.Args {
656			prepareArgs = append(prepareArgs, fmt.Sprintf("%v", arg))
657		}
658	}
659
660	pluginLogFile := filepath.Join(cfg.TaskDir().Dir, fmt.Sprintf("%s-executor.out", cfg.Name))
661	executorConfig := &executor.ExecutorConfig{
662		LogFile:  pluginLogFile,
663		LogLevel: "debug",
664	}
665
666	execImpl, pluginClient, err := executor.CreateExecutor(
667		d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),
668		d.nomadConfig, executorConfig)
669	if err != nil {
670		return nil, nil, err
671	}
672
673	absPath, err := GetAbsolutePath(rktCmd)
674	if err != nil {
675		return nil, nil, err
676	}
677
678	var outBuf, errBuf bytes.Buffer
679	cmd := exec.Command(rktCmd, prepareArgs...)
680	cmd.Stdout = &outBuf
681	cmd.Stderr = &errBuf
682	d.logger.Debug("preparing taskConfig", "pod", img, "task_name", cfg.Name, "args", prepareArgs)
683	if err := cmd.Run(); err != nil {
684		return nil, nil, fmt.Errorf("Error preparing rkt pod: %s\n\nOutput: %s\n\nError: %s",
685			err, outBuf.String(), errBuf.String())
686	}
687	uuid := strings.TrimSpace(outBuf.String())
688	d.logger.Debug("taskConfig prepared", "pod", img, "task_name", cfg.Name, "uuid", uuid)
689	runArgs = append(runArgs, uuid)
690
691	// The taskConfig's environment is set via --set-env flags above, but the rkt
692	// command itself needs an environment with PATH set to find iptables.
693
694	// TODO (preetha) need to figure out how to pass env.blacklist from client config
695	eb := taskenv.NewEmptyBuilder()
696	filter := strings.Split(config.DefaultEnvBlacklist, ",")
697	rktEnv := eb.SetHostEnvvars(filter).Build()
698
699	// Enable ResourceLimits to place the executor in a parent cgroup of
700	// the rkt container. This allows stats collection via the executor to
701	// work just like it does for exec.
702	execCmd := &executor.ExecCommand{
703		Cmd:            absPath,
704		Args:           runArgs,
705		ResourceLimits: true,
706		Resources:      cfg.Resources,
707
708		// Use rktEnv, the environment needed for running rkt, not the task env
709		Env: rktEnv.List(),
710
711		TaskDir:    cfg.TaskDir().Dir,
712		StdoutPath: cfg.StdoutPath,
713		StderrPath: cfg.StderrPath,
714	}
715	ps, err := execImpl.Launch(execCmd)
716	if err != nil {
717		pluginClient.Kill()
718		return nil, nil, err
719	}
720
721	d.logger.Debug("started taskConfig", "aci", img, "uuid", uuid, "task_name", cfg.Name, "args", runArgs)
722	h := &taskHandle{
723		exec:         execImpl,
724		env:          rktEnv,
725		pid:          ps.Pid,
726		uuid:         uuid,
727		pluginClient: pluginClient,
728		taskConfig:   cfg,
729		procState:    drivers.TaskStateRunning,
730		startedAt:    time.Now().Round(time.Millisecond),
731		logger:       d.logger,
732	}
733
734	rktDriverState := TaskState{
735		ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
736		Pid:            ps.Pid,
737		TaskConfig:     cfg,
738		StartedAt:      h.startedAt,
739		UUID:           uuid,
740	}
741
742	if err := handle.SetDriverState(&rktDriverState); err != nil {
743		d.logger.Error("failed to start task, error setting driver state", "error", err, "task_name", cfg.Name)
744		execImpl.Shutdown("", 0)
745		pluginClient.Kill()
746		return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
747	}
748
749	d.tasks.Set(cfg.ID, h)
750	go h.run()
751
752	// Do not attempt to retrieve driver network if one won't exist:
753	//  - "host" means the container itself has no networking metadata
754	//  - "none" means no network is configured
755	// https://coreos.com/rkt/docs/latest/networking/overview.html#no-loopback-only-networking
756	var driverNetwork *drivers.DriverNetwork
757	if network != "host" && network != "none" {
758		d.logger.Debug("retrieving network information for pod", "pod", img, "UUID", uuid, "task_name", cfg.Name)
759		driverNetwork, err = rktGetDriverNetwork(uuid, driverConfig.PortMap, d.logger)
760		if err != nil && !pluginClient.Exited() {
761			d.logger.Warn("network status retrieval for pod failed", "pod", img, "UUID", uuid, "task_name", cfg.Name, "error", err)
762
763			// If a portmap was given, this turns into a fatal error
764			if len(driverConfig.PortMap) != 0 {
765				pluginClient.Kill()
766				return nil, nil, fmt.Errorf("Trying to map ports but driver could not determine network information")
767			}
768		}
769	}
770
771	return handle, driverNetwork, nil
772
773}
774
775func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
776	handle, ok := d.tasks.Get(taskID)
777	if !ok {
778		return nil, drivers.ErrTaskNotFound
779	}
780
781	ch := make(chan *drivers.ExitResult)
782	go d.handleWait(ctx, handle, ch)
783
784	return ch, nil
785}
786
787func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
788	handle, ok := d.tasks.Get(taskID)
789	if !ok {
790		return drivers.ErrTaskNotFound
791	}
792
793	if err := handle.exec.Shutdown(signal, timeout); err != nil {
794		if handle.pluginClient.Exited() {
795			return nil
796		}
797		return fmt.Errorf("executor Shutdown failed: %v", err)
798	}
799
800	return nil
801}
802
803func (d *Driver) DestroyTask(taskID string, force bool) error {
804	handle, ok := d.tasks.Get(taskID)
805	if !ok {
806		return drivers.ErrTaskNotFound
807	}
808
809	if handle.IsRunning() && !force {
810		return fmt.Errorf("cannot destroy running task")
811	}
812
813	if !handle.pluginClient.Exited() {
814		if err := handle.exec.Shutdown("", 0); err != nil {
815			handle.logger.Error("destroying executor failed", "err", err)
816		}
817
818		handle.pluginClient.Kill()
819	}
820
821	d.tasks.Delete(taskID)
822	return nil
823}
824
825func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
826	handle, ok := d.tasks.Get(taskID)
827	if !ok {
828		return nil, drivers.ErrTaskNotFound
829	}
830
831	return handle.TaskStatus(), nil
832}
833
834func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
835	handle, ok := d.tasks.Get(taskID)
836	if !ok {
837		return nil, drivers.ErrTaskNotFound
838	}
839
840	return handle.exec.Stats(ctx, interval)
841}
842
843func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
844	return d.eventer.TaskEvents(ctx)
845}
846
847func (d *Driver) SignalTask(taskID string, signal string) error {
848	handle, ok := d.tasks.Get(taskID)
849	if !ok {
850		return drivers.ErrTaskNotFound
851	}
852
853	sig := os.Interrupt
854	if s, ok := signals.SignalLookup[signal]; ok {
855		sig = s
856	} else {
857		d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID, "task_name", handle.taskConfig.Name)
858
859	}
860	return handle.exec.Signal(sig)
861}
862
863func (d *Driver) ExecTask(taskID string, cmdArgs []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
864	if len(cmdArgs) == 0 {
865		return nil, fmt.Errorf("error cmd must have at least one value")
866	}
867	handle, ok := d.tasks.Get(taskID)
868	if !ok {
869		return nil, drivers.ErrTaskNotFound
870	}
871	// enter + UUID + cmd + args...
872	cmd := cmdArgs[0]
873	args := cmdArgs[1:]
874	enterArgs := make([]string, 3+len(args))
875	enterArgs[0] = "enter"
876	enterArgs[1] = handle.uuid
877	enterArgs[2] = handle.env.ReplaceEnv(cmd)
878	copy(enterArgs[3:], handle.env.ParseAndReplace(args))
879	out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), rktCmd, enterArgs)
880	if err != nil {
881		return nil, err
882	}
883
884	return &drivers.ExecTaskResult{
885		Stdout: out,
886		ExitResult: &drivers.ExitResult{
887			ExitCode: exitCode,
888		},
889	}, nil
890
891}
892
893var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil)
894
895func (d *Driver) ExecTaskStreamingRaw(ctx context.Context,
896	taskID string,
897	command []string,
898	tty bool,
899	stream drivers.ExecTaskStream) error {
900
901	if len(command) == 0 {
902		return fmt.Errorf("error cmd must have at least one value")
903	}
904	handle, ok := d.tasks.Get(taskID)
905	if !ok {
906		return drivers.ErrTaskNotFound
907	}
908
909	enterCmd := []string{rktCmd, "enter", handle.uuid, handle.env.ReplaceEnv(command[0])}
910	enterCmd = append(enterCmd, handle.env.ParseAndReplace(command[1:])...)
911
912	return handle.exec.ExecStreaming(ctx, enterCmd, tty, stream)
913}
914
915// GetAbsolutePath returns the absolute path of the passed binary by resolving
916// it in the path and following symlinks.
917func GetAbsolutePath(bin string) (string, error) {
918	lp, err := exec.LookPath(bin)
919	if err != nil {
920		return "", fmt.Errorf("failed to resolve path to %q executable: %v", bin, err)
921	}
922
923	return filepath.EvalSymlinks(lp)
924}
925
926func rktGetDriverNetwork(uuid string, driverConfigPortMap map[string]string, logger hclog.Logger) (*drivers.DriverNetwork, error) {
927	deadline := time.Now().Add(networkDeadline)
928	var lastErr error
929	try := 0
930
931	for time.Now().Before(deadline) {
932		try++
933		if status, err := rktGetStatus(uuid, logger); err == nil {
934			for _, net := range status.Networks {
935				if !net.IP.IsGlobalUnicast() {
936					continue
937				}
938
939				// Get the pod manifest so we can figure out which ports are exposed
940				var portmap map[string]int
941				manifest, err := rktGetManifest(uuid)
942				if err == nil {
943					portmap, err = rktManifestMakePortMap(manifest, driverConfigPortMap)
944					if err != nil {
945						lastErr = fmt.Errorf("could not create manifest-based portmap: %v", err)
946						return nil, lastErr
947					}
948				} else {
949					lastErr = fmt.Errorf("could not get pod manifest: %v", err)
950					return nil, lastErr
951				}
952
953				// This is a successful landing; log if its not the first attempt.
954				if try > 1 {
955					logger.Debug("retrieved network info for pod", "uuid", uuid, "attempt", try)
956				}
957				return &drivers.DriverNetwork{
958					PortMap: portmap,
959					IP:      status.Networks[0].IP.String(),
960				}, nil
961			}
962
963			if len(status.Networks) == 0 {
964				lastErr = fmt.Errorf("no networks found")
965			} else {
966				lastErr = fmt.Errorf("no good driver networks out of %d returned", len(status.Networks))
967			}
968		} else {
969			lastErr = fmt.Errorf("getting status failed: %v", err)
970		}
971
972		waitTime := getJitteredNetworkRetryTime()
973		logger.Debug("failed getting network info for pod, sleeping", "uuid", uuid, "attempt", try, "err", lastErr, "wait", waitTime)
974		time.Sleep(waitTime)
975	}
976	return nil, fmt.Errorf("timed out, last error: %v", lastErr)
977}
978
979// Given a rkt/appc pod manifest and driver portmap configuration, create
980// a driver portmap.
981func rktManifestMakePortMap(manifest *appcschema.PodManifest, configPortMap map[string]string) (map[string]int, error) {
982	if len(manifest.Apps) == 0 {
983		return nil, fmt.Errorf("manifest has no apps")
984	}
985	if len(manifest.Apps) != 1 {
986		return nil, fmt.Errorf("manifest has multiple apps!")
987	}
988	app := manifest.Apps[0]
989	if app.App == nil {
990		return nil, fmt.Errorf("specified app has no App object")
991	}
992
993	portMap := make(map[string]int)
994	for svc, name := range configPortMap {
995		for _, port := range app.App.Ports {
996			if port.Name.String() == name {
997				portMap[svc] = int(port.Port)
998			}
999		}
1000	}
1001	return portMap, nil
1002}
1003
1004// Retrieve pod status for the pod with the given UUID.
1005func rktGetStatus(uuid string, logger hclog.Logger) (*Pod, error) {
1006	statusArgs := []string{
1007		"status",
1008		"--format=json",
1009		uuid,
1010	}
1011	var outBuf, errBuf bytes.Buffer
1012	cmd := exec.Command(rktCmd, statusArgs...)
1013	cmd.Stdout = &outBuf
1014	cmd.Stderr = &errBuf
1015	if err := cmd.Run(); err != nil {
1016		if outBuf.Len() > 0 {
1017			logger.Debug("status output for UUID", "uuid", uuid, "error", elide(outBuf))
1018		}
1019		if errBuf.Len() == 0 {
1020			return nil, err
1021		}
1022		logger.Debug("status error output", "uuid", uuid, "error", elide(errBuf))
1023		return nil, fmt.Errorf("%s. stderr: %q", err, elide(errBuf))
1024	}
1025	var status Pod
1026	if err := json.Unmarshal(outBuf.Bytes(), &status); err != nil {
1027		return nil, err
1028	}
1029	return &status, nil
1030}
1031
1032// Retrieves a pod manifest
1033func rktGetManifest(uuid string) (*appcschema.PodManifest, error) {
1034	statusArgs := []string{
1035		"cat-manifest",
1036		uuid,
1037	}
1038	var outBuf bytes.Buffer
1039	cmd := exec.Command(rktCmd, statusArgs...)
1040	cmd.Stdout = &outBuf
1041	cmd.Stderr = ioutil.Discard
1042	if err := cmd.Run(); err != nil {
1043		return nil, err
1044	}
1045	var manifest appcschema.PodManifest
1046	if err := json.Unmarshal(outBuf.Bytes(), &manifest); err != nil {
1047		return nil, err
1048	}
1049	return &manifest, nil
1050}
1051
1052// Create a time with a 0 to 100ms jitter for rktGetDriverNetwork retries
1053func getJitteredNetworkRetryTime() time.Duration {
1054	return time.Duration(900+rand.Intn(100)) * time.Millisecond
1055}
1056
1057// Conditionally elide a buffer to an arbitrary length
1058func elideToLen(inBuf bytes.Buffer, length int) bytes.Buffer {
1059	if inBuf.Len() > length {
1060		inBuf.Truncate(length)
1061		inBuf.WriteString("...")
1062	}
1063	return inBuf
1064}
1065
1066// Conditionally elide a buffer to an 80 character string
1067func elide(inBuf bytes.Buffer) string {
1068	tempBuf := elideToLen(inBuf, 80)
1069	return tempBuf.String()
1070}
1071
1072func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
1073	defer close(ch)
1074	var result *drivers.ExitResult
1075	ps, err := handle.exec.Wait(ctx)
1076	if err != nil {
1077		result = &drivers.ExitResult{
1078			Err: fmt.Errorf("executor: error waiting on process: %v", err),
1079		}
1080	} else {
1081		result = &drivers.ExitResult{
1082			ExitCode: ps.ExitCode,
1083			Signal:   ps.Signal,
1084		}
1085	}
1086
1087	select {
1088	case <-ctx.Done():
1089	case <-d.ctx.Done():
1090	case ch <- result:
1091	}
1092}
1093
1094func (d *Driver) Shutdown() {
1095	d.signalShutdown()
1096}
1097