1package rkt 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io/ioutil" 9 "math/rand" 10 "net" 11 "os" 12 "os/exec" 13 "path/filepath" 14 "regexp" 15 "strconv" 16 "strings" 17 "sync" 18 "syscall" 19 "time" 20 21 appcschema "github.com/appc/spec/schema" 22 "github.com/hashicorp/consul-template/signals" 23 hclog "github.com/hashicorp/go-hclog" 24 version "github.com/hashicorp/go-version" 25 "github.com/hashicorp/nomad/client/config" 26 "github.com/hashicorp/nomad/client/taskenv" 27 "github.com/hashicorp/nomad/drivers/shared/eventer" 28 "github.com/hashicorp/nomad/drivers/shared/executor" 29 "github.com/hashicorp/nomad/helper" 30 "github.com/hashicorp/nomad/helper/pluginutils/hclutils" 31 "github.com/hashicorp/nomad/helper/pluginutils/loader" 32 "github.com/hashicorp/nomad/plugins/base" 33 "github.com/hashicorp/nomad/plugins/drivers" 34 "github.com/hashicorp/nomad/plugins/shared/hclspec" 35 pstructs "github.com/hashicorp/nomad/plugins/shared/structs" 36) 37 38const ( 39 // pluginName is the name of the plugin 40 pluginName = "rkt" 41 42 // fingerprintPeriod is the interval at which the driver will send fingerprint responses 43 fingerprintPeriod = 30 * time.Second 44 45 // minRktVersion is the earliest supported version of rkt. rkt added support 46 // for CPU and memory isolators in 0.14.0. We cannot support an earlier 47 // version to maintain an uniform interface across all drivers 48 minRktVersion = "1.27.0" 49 50 // rktCmd is the command rkt is installed as. 51 rktCmd = "rkt" 52 53 // networkDeadline is how long to wait for container network 54 // information to become available. 55 networkDeadline = 1 * time.Minute 56 57 // taskHandleVersion is the version of task handle which this driver sets 58 // and understands how to decode driver state 59 taskHandleVersion = 1 60) 61 62var ( 63 // PluginID is the rawexec plugin metadata registered in the plugin 64 // catalog. 65 PluginID = loader.PluginID{ 66 Name: pluginName, 67 PluginType: base.PluginTypeDriver, 68 } 69 70 // PluginConfig is the rawexec factory function registered in the 71 // plugin catalog. 72 PluginConfig = &loader.InternalPluginConfig{ 73 Config: map[string]interface{}{}, 74 Factory: func(l hclog.Logger) interface{} { return NewRktDriver(l) }, 75 } 76) 77 78// PluginLoader maps pre-0.9 client driver options to post-0.9 plugin options. 79func PluginLoader(opts map[string]string) (map[string]interface{}, error) { 80 conf := map[string]interface{}{} 81 if v, err := strconv.ParseBool(opts["driver.rkt.volumes.enabled"]); err == nil { 82 conf["volumes_enabled"] = v 83 } 84 return conf, nil 85} 86 87var ( 88 // pluginInfo is the response returned for the PluginInfo RPC 89 pluginInfo = &base.PluginInfoResponse{ 90 Type: base.PluginTypeDriver, 91 PluginApiVersions: []string{drivers.ApiVersion010}, 92 PluginVersion: "0.1.0", 93 Name: pluginName, 94 } 95 96 // configSpec is the hcl specification returned by the ConfigSchema RPC 97 configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ 98 "volumes_enabled": hclspec.NewDefault( 99 hclspec.NewAttr("volumes_enabled", "bool", false), 100 hclspec.NewLiteral("true"), 101 ), 102 }) 103 104 // taskConfigSpec is the hcl specification for the driver config section of 105 // a taskConfig within a job. It is returned in the TaskConfigSchema RPC 106 taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ 107 "image": hclspec.NewAttr("image", "string", true), 108 "command": hclspec.NewAttr("command", "string", false), 109 "args": hclspec.NewAttr("args", "list(string)", false), 110 "trust_prefix": hclspec.NewAttr("trust_prefix", "string", false), 111 "dns_servers": hclspec.NewAttr("dns_servers", "list(string)", false), 112 "dns_search_domains": hclspec.NewAttr("dns_search_domains", "list(string)", false), 113 "net": hclspec.NewAttr("net", "list(string)", false), 114 "port_map": hclspec.NewAttr("port_map", "list(map(string))", false), 115 "volumes": hclspec.NewAttr("volumes", "list(string)", false), 116 "insecure_options": hclspec.NewAttr("insecure_options", "list(string)", false), 117 "no_overlay": hclspec.NewAttr("no_overlay", "bool", false), 118 "debug": hclspec.NewAttr("debug", "bool", false), 119 "group": hclspec.NewAttr("group", "string", false), 120 }) 121 122 // capabilities is returned by the Capabilities RPC and indicates what 123 // optional features this driver supports 124 capabilities = &drivers.Capabilities{ 125 SendSignals: true, 126 Exec: true, 127 FSIsolation: drivers.FSIsolationImage, 128 } 129 130 reRktVersion = regexp.MustCompile(`rkt [vV]ersion[:]? (\d[.\d]+)`) 131 reAppcVersion = regexp.MustCompile(`appc [vV]ersion[:]? (\d[.\d]+)`) 132) 133 134// Config is the client configuration for the driver 135type Config struct { 136 // VolumesEnabled allows tasks to bind host paths (volumes) inside their 137 // container. Binding relative paths is always allowed and will be resolved 138 // relative to the allocation's directory. 139 VolumesEnabled bool `codec:"volumes_enabled"` 140} 141 142// TaskConfig is the driver configuration of a taskConfig within a job 143type TaskConfig struct { 144 ImageName string `codec:"image"` 145 Command string `codec:"command"` 146 Args []string `codec:"args"` 147 TrustPrefix string `codec:"trust_prefix"` 148 DNSServers []string `codec:"dns_servers"` // DNS Server for containers 149 DNSSearchDomains []string `codec:"dns_search_domains"` // DNS Search domains for containers 150 Net []string `codec:"net"` // Networks for the containers 151 PortMap hclutils.MapStrStr `codec:"port_map"` // A map of host port and the port name defined in the image manifest file 152 Volumes []string `codec:"volumes"` // Host-Volumes to mount in, syntax: /path/to/host/directory:/destination/path/in/container[:readOnly] 153 InsecureOptions []string `codec:"insecure_options"` // list of args for --insecure-options 154 155 NoOverlay bool `codec:"no_overlay"` // disable overlayfs for rkt run 156 Debug bool `codec:"debug"` // Enable debug option for rkt command 157 Group string `codec:"group"` // Group override for the container 158} 159 160// TaskState is the state which is encoded in the handle returned in 161// StartTask. This information is needed to rebuild the taskConfig state and handler 162// during recovery. 163type TaskState struct { 164 ReattachConfig *pstructs.ReattachConfig 165 TaskConfig *drivers.TaskConfig 166 Pid int 167 StartedAt time.Time 168 UUID string 169} 170 171// Driver is a driver for running images via Rkt We attempt to chose sane 172// defaults for now, with more configuration available planned in the future. 173type Driver struct { 174 // eventer is used to handle multiplexing of TaskEvents calls such that an 175 // event can be broadcast to all callers 176 eventer *eventer.Eventer 177 178 // config is the driver configuration set by the SetConfig RPC 179 config *Config 180 181 // nomadConfig is the client config from nomad 182 nomadConfig *base.ClientDriverConfig 183 184 // tasks is the in memory datastore mapping taskIDs to rktTaskHandles 185 tasks *taskStore 186 187 // ctx is the context for the driver. It is passed to other subsystems to 188 // coordinate shutdown 189 ctx context.Context 190 191 // signalShutdown is called when the driver is shutting down and cancels the 192 // ctx passed to any subsystems 193 signalShutdown context.CancelFunc 194 195 // logger will log to the Nomad agent 196 logger hclog.Logger 197 198 // A tri-state boolean to know if the fingerprinting has happened and 199 // whether it has been successful 200 fingerprintSuccess *bool 201 fingerprintLock sync.Mutex 202} 203 204func NewRktDriver(logger hclog.Logger) drivers.DriverPlugin { 205 ctx, cancel := context.WithCancel(context.Background()) 206 logger = logger.Named(pluginName) 207 return &Driver{ 208 eventer: eventer.NewEventer(ctx, logger), 209 config: &Config{}, 210 tasks: newTaskStore(), 211 ctx: ctx, 212 signalShutdown: cancel, 213 logger: logger, 214 } 215} 216 217func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) { 218 return pluginInfo, nil 219} 220 221func (d *Driver) ConfigSchema() (*hclspec.Spec, error) { 222 return configSpec, nil 223} 224 225func (d *Driver) SetConfig(cfg *base.Config) error { 226 var config Config 227 if len(cfg.PluginConfig) != 0 { 228 if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil { 229 return err 230 } 231 } 232 233 d.config = &config 234 if cfg.AgentConfig != nil { 235 d.nomadConfig = cfg.AgentConfig.Driver 236 } 237 return nil 238} 239 240func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) { 241 return taskConfigSpec, nil 242} 243 244func (d *Driver) Capabilities() (*drivers.Capabilities, error) { 245 return capabilities, nil 246} 247 248func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { 249 ch := make(chan *drivers.Fingerprint) 250 go d.handleFingerprint(ctx, ch) 251 return ch, nil 252} 253 254func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) { 255 defer close(ch) 256 ticker := time.NewTimer(0) 257 for { 258 select { 259 case <-ctx.Done(): 260 return 261 case <-d.ctx.Done(): 262 return 263 case <-ticker.C: 264 ticker.Reset(fingerprintPeriod) 265 ch <- d.buildFingerprint() 266 } 267 } 268} 269 270// setFingerprintSuccess marks the driver as having fingerprinted successfully 271func (d *Driver) setFingerprintSuccess() { 272 d.fingerprintLock.Lock() 273 d.fingerprintSuccess = helper.BoolToPtr(true) 274 d.fingerprintLock.Unlock() 275} 276 277// setFingerprintFailure marks the driver as having failed fingerprinting 278func (d *Driver) setFingerprintFailure() { 279 d.fingerprintLock.Lock() 280 d.fingerprintSuccess = helper.BoolToPtr(false) 281 d.fingerprintLock.Unlock() 282} 283 284// fingerprintSuccessful returns true if the driver has 285// never fingerprinted or has successfully fingerprinted 286func (d *Driver) fingerprintSuccessful() bool { 287 d.fingerprintLock.Lock() 288 defer d.fingerprintLock.Unlock() 289 return d.fingerprintSuccess == nil || *d.fingerprintSuccess 290} 291 292func (d *Driver) buildFingerprint() *drivers.Fingerprint { 293 fingerprint := &drivers.Fingerprint{ 294 Attributes: map[string]*pstructs.Attribute{}, 295 Health: drivers.HealthStateHealthy, 296 HealthDescription: drivers.DriverHealthy, 297 } 298 299 // Only enable if we are root 300 if syscall.Geteuid() != 0 { 301 if d.fingerprintSuccessful() { 302 d.logger.Debug("must run as root user, disabling") 303 } 304 d.setFingerprintFailure() 305 fingerprint.Health = drivers.HealthStateUndetected 306 fingerprint.HealthDescription = drivers.DriverRequiresRootMessage 307 return fingerprint 308 } 309 310 outBytes, err := exec.Command(rktCmd, "version").Output() 311 if err != nil { 312 fingerprint.Health = drivers.HealthStateUndetected 313 fingerprint.HealthDescription = fmt.Sprintf("Failed to execute %s version: %v", rktCmd, err) 314 d.setFingerprintFailure() 315 return fingerprint 316 } 317 out := strings.TrimSpace(string(outBytes)) 318 319 rktMatches := reRktVersion.FindStringSubmatch(out) 320 appcMatches := reAppcVersion.FindStringSubmatch(out) 321 if len(rktMatches) != 2 || len(appcMatches) != 2 { 322 fingerprint.Health = drivers.HealthStateUndetected 323 fingerprint.HealthDescription = "Unable to parse rkt version string" 324 d.setFingerprintFailure() 325 return fingerprint 326 } 327 328 minVersion, _ := version.NewVersion(minRktVersion) 329 currentVersion, _ := version.NewVersion(rktMatches[1]) 330 if currentVersion.LessThan(minVersion) { 331 // Do not allow ancient rkt versions 332 fingerprint.Health = drivers.HealthStateUndetected 333 fingerprint.HealthDescription = fmt.Sprintf("Unsuported rkt version %s", currentVersion) 334 if d.fingerprintSuccessful() { 335 d.logger.Warn("unsupported rkt version please upgrade to >= "+minVersion.String(), 336 "rkt_version", currentVersion) 337 } 338 d.setFingerprintFailure() 339 return fingerprint 340 } 341 342 fingerprint.Attributes["driver.rkt"] = pstructs.NewBoolAttribute(true) 343 fingerprint.Attributes["driver.rkt.version"] = pstructs.NewStringAttribute(rktMatches[1]) 344 fingerprint.Attributes["driver.rkt.appc.version"] = pstructs.NewStringAttribute(appcMatches[1]) 345 if d.config.VolumesEnabled { 346 fingerprint.Attributes["driver.rkt.volumes.enabled"] = pstructs.NewBoolAttribute(true) 347 } 348 d.setFingerprintSuccess() 349 return fingerprint 350 351} 352 353func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { 354 if handle == nil { 355 return fmt.Errorf("error: handle cannot be nil") 356 } 357 358 // COMPAT(0.10): pre 0.9 upgrade path check 359 if handle.Version == 0 { 360 return d.recoverPre09Task(handle) 361 } 362 363 // If already attached to handle there's nothing to recover. 364 if _, ok := d.tasks.Get(handle.Config.ID); ok { 365 d.logger.Trace("nothing to recover; task already exists", 366 "task_id", handle.Config.ID, 367 "task_name", handle.Config.Name, 368 ) 369 return nil 370 } 371 372 var taskState TaskState 373 if err := handle.GetDriverState(&taskState); err != nil { 374 d.logger.Error("failed to decode taskConfig state from handle", "error", err, "task_id", handle.Config.ID) 375 return fmt.Errorf("failed to decode taskConfig state from handle: %v", err) 376 } 377 378 plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig) 379 if err != nil { 380 d.logger.Error("failed to build ReattachConfig from taskConfig state", "error", err, "task_id", handle.Config.ID) 381 return fmt.Errorf("failed to build ReattachConfig from taskConfig state: %v", err) 382 } 383 384 execImpl, pluginClient, err := executor.ReattachToExecutor(plugRC, 385 d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID)) 386 if err != nil { 387 d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID) 388 return fmt.Errorf("failed to reattach to executor: %v", err) 389 } 390 391 // The taskConfig's environment is set via --set-env flags in Start, but the rkt 392 // command itself needs an environment with PATH set to find iptables. 393 // TODO (preetha) need to figure out how to read env.blacklist 394 eb := taskenv.NewEmptyBuilder() 395 filter := strings.Split(config.DefaultEnvBlacklist, ",") 396 rktEnv := eb.SetHostEnvvars(filter).Build() 397 398 h := &taskHandle{ 399 exec: execImpl, 400 env: rktEnv, 401 pid: taskState.Pid, 402 uuid: taskState.UUID, 403 pluginClient: pluginClient, 404 taskConfig: taskState.TaskConfig, 405 procState: drivers.TaskStateRunning, 406 startedAt: taskState.StartedAt, 407 exitResult: &drivers.ExitResult{}, 408 logger: d.logger, 409 } 410 411 d.tasks.Set(taskState.TaskConfig.ID, h) 412 413 go h.run() 414 return nil 415} 416 417func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { 418 if _, ok := d.tasks.Get(cfg.ID); ok { 419 return nil, nil, fmt.Errorf("taskConfig with ID '%s' already started", cfg.ID) 420 } 421 422 var driverConfig TaskConfig 423 424 if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { 425 return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) 426 } 427 428 handle := drivers.NewTaskHandle(taskHandleVersion) 429 handle.Config = cfg 430 431 // todo(preetha) - port map in client v1 is a slice of maps that get merged. figure out if the caller will do this 432 //driverConfig.PortMap 433 434 // ACI image 435 img := driverConfig.ImageName 436 437 // Global arguments given to both prepare and run-prepared 438 globalArgs := make([]string, 0, 50) 439 440 // Add debug option to rkt command. 441 debug := driverConfig.Debug 442 443 // Add the given trust prefix 444 trustPrefix := driverConfig.TrustPrefix 445 insecure := false 446 if trustPrefix != "" { 447 var outBuf, errBuf bytes.Buffer 448 cmd := exec.Command(rktCmd, "trust", "--skip-fingerprint-review=true", fmt.Sprintf("--prefix=%s", trustPrefix), fmt.Sprintf("--debug=%t", debug)) 449 cmd.Stdout = &outBuf 450 cmd.Stderr = &errBuf 451 if err := cmd.Run(); err != nil { 452 return nil, nil, fmt.Errorf("Error running rkt trust: %s\n\nOutput: %s\n\nError: %s", 453 err, outBuf.String(), errBuf.String()) 454 } 455 d.logger.Debug("added trust prefix", "trust_prefix", trustPrefix, "task_name", cfg.Name) 456 } else { 457 // Disable signature verification if the trust command was not run. 458 insecure = true 459 } 460 461 // if we have a selective insecure_options, prefer them 462 // insecure options are rkt's global argument, so we do this before the actual "run" 463 if len(driverConfig.InsecureOptions) > 0 { 464 globalArgs = append(globalArgs, fmt.Sprintf("--insecure-options=%s", strings.Join(driverConfig.InsecureOptions, ","))) 465 } else if insecure { 466 globalArgs = append(globalArgs, "--insecure-options=all") 467 } 468 469 // debug is rkt's global argument, so add it before the actual "run" 470 globalArgs = append(globalArgs, fmt.Sprintf("--debug=%t", debug)) 471 472 prepareArgs := make([]string, 0, 50) 473 runArgs := make([]string, 0, 50) 474 475 prepareArgs = append(prepareArgs, globalArgs...) 476 prepareArgs = append(prepareArgs, "prepare") 477 runArgs = append(runArgs, globalArgs...) 478 runArgs = append(runArgs, "run-prepared") 479 480 // disable overlayfs 481 if driverConfig.NoOverlay { 482 prepareArgs = append(prepareArgs, "--no-overlay=true") 483 } 484 485 // Convert underscores to dashes in taskConfig names for use in volume names #2358 486 sanitizedName := strings.Replace(cfg.Name, "_", "-", -1) 487 488 // Mount /alloc 489 allocVolName := fmt.Sprintf("%s-%s-alloc", cfg.AllocID, sanitizedName) 490 prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", allocVolName, cfg.TaskDir().SharedAllocDir)) 491 prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", allocVolName, "/alloc")) 492 493 // Mount /local 494 localVolName := fmt.Sprintf("%s-%s-local", cfg.AllocID, sanitizedName) 495 prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", localVolName, cfg.TaskDir().LocalDir)) 496 prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", localVolName, "/local")) 497 498 // Mount /secrets 499 secretsVolName := fmt.Sprintf("%s-%s-secrets", cfg.AllocID, sanitizedName) 500 prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s", secretsVolName, cfg.TaskDir().SecretsDir)) 501 prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", secretsVolName, "/secrets")) 502 503 // Mount arbitrary volumes if enabled 504 if len(driverConfig.Volumes) > 0 { 505 if !d.config.VolumesEnabled { 506 return nil, nil, fmt.Errorf("volumes_enabled is false; cannot use rkt volumes: %+q", driverConfig.Volumes) 507 } 508 for i, rawvol := range driverConfig.Volumes { 509 parts := strings.Split(rawvol, ":") 510 readOnly := "false" 511 // job spec: 512 // volumes = ["/host/path:/container/path[:readOnly]"] 513 // the third parameter is optional, mount is read-write by default 514 if len(parts) == 3 { 515 if parts[2] == "readOnly" { 516 d.logger.Debug("mounting volume as readOnly", "volume", strings.Join(parts[:2], parts[1])) 517 readOnly = "true" 518 } else { 519 d.logger.Warn("unknown volume parameter ignored for mount", "parameter", parts[2], "mount", parts[0]) 520 } 521 } else if len(parts) != 2 { 522 return nil, nil, fmt.Errorf("invalid rkt volume: %q", rawvol) 523 } 524 volName := fmt.Sprintf("%s-%s-%d", cfg.AllocID, sanitizedName, i) 525 prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%s", volName, parts[0], readOnly)) 526 prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, parts[1])) 527 } 528 } 529 530 // Mount task volumes, always do 531 for i, vol := range cfg.Mounts { 532 volName := fmt.Sprintf("%s-%s-taskmounts-%d", cfg.AllocID, sanitizedName, i) 533 prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%v", volName, vol.HostPath, vol.Readonly)) 534 prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, vol.TaskPath)) 535 } 536 537 // Mount task devices, always do 538 for i, vol := range cfg.Devices { 539 volName := fmt.Sprintf("%s-%s-taskdevices-%d", cfg.AllocID, sanitizedName, i) 540 readOnly := !strings.Contains(vol.Permissions, "w") 541 prepareArgs = append(prepareArgs, fmt.Sprintf("--volume=%s,kind=host,source=%s,readOnly=%v", volName, vol.HostPath, readOnly)) 542 prepareArgs = append(prepareArgs, fmt.Sprintf("--mount=volume=%s,target=%s", volName, vol.TaskPath)) 543 } 544 545 // Inject environment variables 546 for k, v := range cfg.Env { 547 prepareArgs = append(prepareArgs, fmt.Sprintf("--set-env=%s=%s", k, v)) 548 } 549 550 // Image is set here, because the commands that follow apply to it 551 prepareArgs = append(prepareArgs, img) 552 553 // Check if the user has overridden the exec command. 554 if driverConfig.Command != "" { 555 prepareArgs = append(prepareArgs, fmt.Sprintf("--exec=%v", driverConfig.Command)) 556 } 557 558 // Add memory isolator 559 prepareArgs = append(prepareArgs, fmt.Sprintf("--memory=%v", cfg.Resources.LinuxResources.MemoryLimitBytes)) 560 561 // Add CPU isolator 562 prepareArgs = append(prepareArgs, fmt.Sprintf("--cpu=%v", cfg.Resources.LinuxResources.CPUShares)) 563 564 // Add DNS servers 565 if len(driverConfig.DNSServers) == 1 && (driverConfig.DNSServers[0] == "host" || driverConfig.DNSServers[0] == "none") { 566 // Special case single item lists with the special values "host" or "none" 567 runArgs = append(runArgs, fmt.Sprintf("--dns=%s", driverConfig.DNSServers[0])) 568 } else { 569 for _, ip := range driverConfig.DNSServers { 570 if err := net.ParseIP(ip); err == nil { 571 wrappedErr := fmt.Errorf("invalid ip address for container dns server %q", ip) 572 d.logger.Debug("error parsing DNS server", "error", wrappedErr) 573 return nil, nil, wrappedErr 574 } 575 runArgs = append(runArgs, fmt.Sprintf("--dns=%s", ip)) 576 } 577 } 578 579 // set DNS search domains 580 for _, domain := range driverConfig.DNSSearchDomains { 581 runArgs = append(runArgs, fmt.Sprintf("--dns-search=%s", domain)) 582 } 583 584 // set network 585 network := strings.Join(driverConfig.Net, ",") 586 if network != "" { 587 runArgs = append(runArgs, fmt.Sprintf("--net=%s", network)) 588 } 589 590 // Setup port mapping and exposed ports 591 if len(cfg.Resources.NomadResources.Networks) == 0 { 592 d.logger.Debug("no network interfaces are available") 593 if len(driverConfig.PortMap) > 0 { 594 return nil, nil, fmt.Errorf("Trying to map ports but no network interface is available") 595 } 596 } else if network == "host" { 597 // Port mapping is skipped when host networking is used. 598 d.logger.Debug("Ignoring port_map when using --net=host", "task_name", cfg.Name) 599 } else { 600 network := cfg.Resources.NomadResources.Networks[0] 601 for _, port := range network.ReservedPorts { 602 var containerPort string 603 604 mapped, ok := driverConfig.PortMap[port.Label] 605 if !ok { 606 // If the user doesn't have a mapped port using port_map, driver stops running container. 607 return nil, nil, fmt.Errorf("port_map is not set. When you defined port in the resources, you need to configure port_map.") 608 } 609 containerPort = mapped 610 611 hostPortStr := strconv.Itoa(port.Value) 612 613 d.logger.Debug("driver.rkt: exposed port", "containerPort", containerPort) 614 // Add port option to rkt run arguments. rkt allows multiple port args 615 prepareArgs = append(prepareArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr)) 616 } 617 618 for _, port := range network.DynamicPorts { 619 // By default we will map the allocated port 1:1 to the container 620 var containerPort string 621 622 if mapped, ok := driverConfig.PortMap[port.Label]; ok { 623 containerPort = mapped 624 } else { 625 // If the user doesn't have mapped a port using port_map, driver stops running container. 626 return nil, nil, fmt.Errorf("port_map is not set. When you defined port in the resources, you need to configure port_map.") 627 } 628 629 hostPortStr := strconv.Itoa(port.Value) 630 631 d.logger.Debug("exposed port", "containerPort", containerPort, "task_name", cfg.Name) 632 // Add port option to rkt run arguments. rkt allows multiple port args 633 prepareArgs = append(prepareArgs, fmt.Sprintf("--port=%s:%s", containerPort, hostPortStr)) 634 } 635 636 } 637 638 // If a user has been specified for the taskConfig, pass it through to the user 639 if cfg.User != "" { 640 prepareArgs = append(prepareArgs, fmt.Sprintf("--user=%s", cfg.User)) 641 } 642 643 // There's no taskConfig-level parameter for groups so check the driver 644 // config for a custom group 645 if driverConfig.Group != "" { 646 prepareArgs = append(prepareArgs, fmt.Sprintf("--group=%s", driverConfig.Group)) 647 } 648 649 // Add user passed arguments. 650 if len(driverConfig.Args) != 0 { 651 652 // Need to start arguments with "--" 653 prepareArgs = append(prepareArgs, "--") 654 655 for _, arg := range driverConfig.Args { 656 prepareArgs = append(prepareArgs, fmt.Sprintf("%v", arg)) 657 } 658 } 659 660 pluginLogFile := filepath.Join(cfg.TaskDir().Dir, fmt.Sprintf("%s-executor.out", cfg.Name)) 661 executorConfig := &executor.ExecutorConfig{ 662 LogFile: pluginLogFile, 663 LogLevel: "debug", 664 } 665 666 execImpl, pluginClient, err := executor.CreateExecutor( 667 d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID), 668 d.nomadConfig, executorConfig) 669 if err != nil { 670 return nil, nil, err 671 } 672 673 absPath, err := GetAbsolutePath(rktCmd) 674 if err != nil { 675 return nil, nil, err 676 } 677 678 var outBuf, errBuf bytes.Buffer 679 cmd := exec.Command(rktCmd, prepareArgs...) 680 cmd.Stdout = &outBuf 681 cmd.Stderr = &errBuf 682 d.logger.Debug("preparing taskConfig", "pod", img, "task_name", cfg.Name, "args", prepareArgs) 683 if err := cmd.Run(); err != nil { 684 return nil, nil, fmt.Errorf("Error preparing rkt pod: %s\n\nOutput: %s\n\nError: %s", 685 err, outBuf.String(), errBuf.String()) 686 } 687 uuid := strings.TrimSpace(outBuf.String()) 688 d.logger.Debug("taskConfig prepared", "pod", img, "task_name", cfg.Name, "uuid", uuid) 689 runArgs = append(runArgs, uuid) 690 691 // The taskConfig's environment is set via --set-env flags above, but the rkt 692 // command itself needs an environment with PATH set to find iptables. 693 694 // TODO (preetha) need to figure out how to pass env.blacklist from client config 695 eb := taskenv.NewEmptyBuilder() 696 filter := strings.Split(config.DefaultEnvBlacklist, ",") 697 rktEnv := eb.SetHostEnvvars(filter).Build() 698 699 // Enable ResourceLimits to place the executor in a parent cgroup of 700 // the rkt container. This allows stats collection via the executor to 701 // work just like it does for exec. 702 execCmd := &executor.ExecCommand{ 703 Cmd: absPath, 704 Args: runArgs, 705 ResourceLimits: true, 706 Resources: cfg.Resources, 707 708 // Use rktEnv, the environment needed for running rkt, not the task env 709 Env: rktEnv.List(), 710 711 TaskDir: cfg.TaskDir().Dir, 712 StdoutPath: cfg.StdoutPath, 713 StderrPath: cfg.StderrPath, 714 } 715 ps, err := execImpl.Launch(execCmd) 716 if err != nil { 717 pluginClient.Kill() 718 return nil, nil, err 719 } 720 721 d.logger.Debug("started taskConfig", "aci", img, "uuid", uuid, "task_name", cfg.Name, "args", runArgs) 722 h := &taskHandle{ 723 exec: execImpl, 724 env: rktEnv, 725 pid: ps.Pid, 726 uuid: uuid, 727 pluginClient: pluginClient, 728 taskConfig: cfg, 729 procState: drivers.TaskStateRunning, 730 startedAt: time.Now().Round(time.Millisecond), 731 logger: d.logger, 732 } 733 734 rktDriverState := TaskState{ 735 ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()), 736 Pid: ps.Pid, 737 TaskConfig: cfg, 738 StartedAt: h.startedAt, 739 UUID: uuid, 740 } 741 742 if err := handle.SetDriverState(&rktDriverState); err != nil { 743 d.logger.Error("failed to start task, error setting driver state", "error", err, "task_name", cfg.Name) 744 execImpl.Shutdown("", 0) 745 pluginClient.Kill() 746 return nil, nil, fmt.Errorf("failed to set driver state: %v", err) 747 } 748 749 d.tasks.Set(cfg.ID, h) 750 go h.run() 751 752 // Do not attempt to retrieve driver network if one won't exist: 753 // - "host" means the container itself has no networking metadata 754 // - "none" means no network is configured 755 // https://coreos.com/rkt/docs/latest/networking/overview.html#no-loopback-only-networking 756 var driverNetwork *drivers.DriverNetwork 757 if network != "host" && network != "none" { 758 d.logger.Debug("retrieving network information for pod", "pod", img, "UUID", uuid, "task_name", cfg.Name) 759 driverNetwork, err = rktGetDriverNetwork(uuid, driverConfig.PortMap, d.logger) 760 if err != nil && !pluginClient.Exited() { 761 d.logger.Warn("network status retrieval for pod failed", "pod", img, "UUID", uuid, "task_name", cfg.Name, "error", err) 762 763 // If a portmap was given, this turns into a fatal error 764 if len(driverConfig.PortMap) != 0 { 765 pluginClient.Kill() 766 return nil, nil, fmt.Errorf("Trying to map ports but driver could not determine network information") 767 } 768 } 769 } 770 771 return handle, driverNetwork, nil 772 773} 774 775func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { 776 handle, ok := d.tasks.Get(taskID) 777 if !ok { 778 return nil, drivers.ErrTaskNotFound 779 } 780 781 ch := make(chan *drivers.ExitResult) 782 go d.handleWait(ctx, handle, ch) 783 784 return ch, nil 785} 786 787func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { 788 handle, ok := d.tasks.Get(taskID) 789 if !ok { 790 return drivers.ErrTaskNotFound 791 } 792 793 if err := handle.exec.Shutdown(signal, timeout); err != nil { 794 if handle.pluginClient.Exited() { 795 return nil 796 } 797 return fmt.Errorf("executor Shutdown failed: %v", err) 798 } 799 800 return nil 801} 802 803func (d *Driver) DestroyTask(taskID string, force bool) error { 804 handle, ok := d.tasks.Get(taskID) 805 if !ok { 806 return drivers.ErrTaskNotFound 807 } 808 809 if handle.IsRunning() && !force { 810 return fmt.Errorf("cannot destroy running task") 811 } 812 813 if !handle.pluginClient.Exited() { 814 if err := handle.exec.Shutdown("", 0); err != nil { 815 handle.logger.Error("destroying executor failed", "err", err) 816 } 817 818 handle.pluginClient.Kill() 819 } 820 821 d.tasks.Delete(taskID) 822 return nil 823} 824 825func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { 826 handle, ok := d.tasks.Get(taskID) 827 if !ok { 828 return nil, drivers.ErrTaskNotFound 829 } 830 831 return handle.TaskStatus(), nil 832} 833 834func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { 835 handle, ok := d.tasks.Get(taskID) 836 if !ok { 837 return nil, drivers.ErrTaskNotFound 838 } 839 840 return handle.exec.Stats(ctx, interval) 841} 842 843func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { 844 return d.eventer.TaskEvents(ctx) 845} 846 847func (d *Driver) SignalTask(taskID string, signal string) error { 848 handle, ok := d.tasks.Get(taskID) 849 if !ok { 850 return drivers.ErrTaskNotFound 851 } 852 853 sig := os.Interrupt 854 if s, ok := signals.SignalLookup[signal]; ok { 855 sig = s 856 } else { 857 d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID, "task_name", handle.taskConfig.Name) 858 859 } 860 return handle.exec.Signal(sig) 861} 862 863func (d *Driver) ExecTask(taskID string, cmdArgs []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { 864 if len(cmdArgs) == 0 { 865 return nil, fmt.Errorf("error cmd must have at least one value") 866 } 867 handle, ok := d.tasks.Get(taskID) 868 if !ok { 869 return nil, drivers.ErrTaskNotFound 870 } 871 // enter + UUID + cmd + args... 872 cmd := cmdArgs[0] 873 args := cmdArgs[1:] 874 enterArgs := make([]string, 3+len(args)) 875 enterArgs[0] = "enter" 876 enterArgs[1] = handle.uuid 877 enterArgs[2] = handle.env.ReplaceEnv(cmd) 878 copy(enterArgs[3:], handle.env.ParseAndReplace(args)) 879 out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), rktCmd, enterArgs) 880 if err != nil { 881 return nil, err 882 } 883 884 return &drivers.ExecTaskResult{ 885 Stdout: out, 886 ExitResult: &drivers.ExitResult{ 887 ExitCode: exitCode, 888 }, 889 }, nil 890 891} 892 893var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil) 894 895func (d *Driver) ExecTaskStreamingRaw(ctx context.Context, 896 taskID string, 897 command []string, 898 tty bool, 899 stream drivers.ExecTaskStream) error { 900 901 if len(command) == 0 { 902 return fmt.Errorf("error cmd must have at least one value") 903 } 904 handle, ok := d.tasks.Get(taskID) 905 if !ok { 906 return drivers.ErrTaskNotFound 907 } 908 909 enterCmd := []string{rktCmd, "enter", handle.uuid, handle.env.ReplaceEnv(command[0])} 910 enterCmd = append(enterCmd, handle.env.ParseAndReplace(command[1:])...) 911 912 return handle.exec.ExecStreaming(ctx, enterCmd, tty, stream) 913} 914 915// GetAbsolutePath returns the absolute path of the passed binary by resolving 916// it in the path and following symlinks. 917func GetAbsolutePath(bin string) (string, error) { 918 lp, err := exec.LookPath(bin) 919 if err != nil { 920 return "", fmt.Errorf("failed to resolve path to %q executable: %v", bin, err) 921 } 922 923 return filepath.EvalSymlinks(lp) 924} 925 926func rktGetDriverNetwork(uuid string, driverConfigPortMap map[string]string, logger hclog.Logger) (*drivers.DriverNetwork, error) { 927 deadline := time.Now().Add(networkDeadline) 928 var lastErr error 929 try := 0 930 931 for time.Now().Before(deadline) { 932 try++ 933 if status, err := rktGetStatus(uuid, logger); err == nil { 934 for _, net := range status.Networks { 935 if !net.IP.IsGlobalUnicast() { 936 continue 937 } 938 939 // Get the pod manifest so we can figure out which ports are exposed 940 var portmap map[string]int 941 manifest, err := rktGetManifest(uuid) 942 if err == nil { 943 portmap, err = rktManifestMakePortMap(manifest, driverConfigPortMap) 944 if err != nil { 945 lastErr = fmt.Errorf("could not create manifest-based portmap: %v", err) 946 return nil, lastErr 947 } 948 } else { 949 lastErr = fmt.Errorf("could not get pod manifest: %v", err) 950 return nil, lastErr 951 } 952 953 // This is a successful landing; log if its not the first attempt. 954 if try > 1 { 955 logger.Debug("retrieved network info for pod", "uuid", uuid, "attempt", try) 956 } 957 return &drivers.DriverNetwork{ 958 PortMap: portmap, 959 IP: status.Networks[0].IP.String(), 960 }, nil 961 } 962 963 if len(status.Networks) == 0 { 964 lastErr = fmt.Errorf("no networks found") 965 } else { 966 lastErr = fmt.Errorf("no good driver networks out of %d returned", len(status.Networks)) 967 } 968 } else { 969 lastErr = fmt.Errorf("getting status failed: %v", err) 970 } 971 972 waitTime := getJitteredNetworkRetryTime() 973 logger.Debug("failed getting network info for pod, sleeping", "uuid", uuid, "attempt", try, "err", lastErr, "wait", waitTime) 974 time.Sleep(waitTime) 975 } 976 return nil, fmt.Errorf("timed out, last error: %v", lastErr) 977} 978 979// Given a rkt/appc pod manifest and driver portmap configuration, create 980// a driver portmap. 981func rktManifestMakePortMap(manifest *appcschema.PodManifest, configPortMap map[string]string) (map[string]int, error) { 982 if len(manifest.Apps) == 0 { 983 return nil, fmt.Errorf("manifest has no apps") 984 } 985 if len(manifest.Apps) != 1 { 986 return nil, fmt.Errorf("manifest has multiple apps!") 987 } 988 app := manifest.Apps[0] 989 if app.App == nil { 990 return nil, fmt.Errorf("specified app has no App object") 991 } 992 993 portMap := make(map[string]int) 994 for svc, name := range configPortMap { 995 for _, port := range app.App.Ports { 996 if port.Name.String() == name { 997 portMap[svc] = int(port.Port) 998 } 999 } 1000 } 1001 return portMap, nil 1002} 1003 1004// Retrieve pod status for the pod with the given UUID. 1005func rktGetStatus(uuid string, logger hclog.Logger) (*Pod, error) { 1006 statusArgs := []string{ 1007 "status", 1008 "--format=json", 1009 uuid, 1010 } 1011 var outBuf, errBuf bytes.Buffer 1012 cmd := exec.Command(rktCmd, statusArgs...) 1013 cmd.Stdout = &outBuf 1014 cmd.Stderr = &errBuf 1015 if err := cmd.Run(); err != nil { 1016 if outBuf.Len() > 0 { 1017 logger.Debug("status output for UUID", "uuid", uuid, "error", elide(outBuf)) 1018 } 1019 if errBuf.Len() == 0 { 1020 return nil, err 1021 } 1022 logger.Debug("status error output", "uuid", uuid, "error", elide(errBuf)) 1023 return nil, fmt.Errorf("%s. stderr: %q", err, elide(errBuf)) 1024 } 1025 var status Pod 1026 if err := json.Unmarshal(outBuf.Bytes(), &status); err != nil { 1027 return nil, err 1028 } 1029 return &status, nil 1030} 1031 1032// Retrieves a pod manifest 1033func rktGetManifest(uuid string) (*appcschema.PodManifest, error) { 1034 statusArgs := []string{ 1035 "cat-manifest", 1036 uuid, 1037 } 1038 var outBuf bytes.Buffer 1039 cmd := exec.Command(rktCmd, statusArgs...) 1040 cmd.Stdout = &outBuf 1041 cmd.Stderr = ioutil.Discard 1042 if err := cmd.Run(); err != nil { 1043 return nil, err 1044 } 1045 var manifest appcschema.PodManifest 1046 if err := json.Unmarshal(outBuf.Bytes(), &manifest); err != nil { 1047 return nil, err 1048 } 1049 return &manifest, nil 1050} 1051 1052// Create a time with a 0 to 100ms jitter for rktGetDriverNetwork retries 1053func getJitteredNetworkRetryTime() time.Duration { 1054 return time.Duration(900+rand.Intn(100)) * time.Millisecond 1055} 1056 1057// Conditionally elide a buffer to an arbitrary length 1058func elideToLen(inBuf bytes.Buffer, length int) bytes.Buffer { 1059 if inBuf.Len() > length { 1060 inBuf.Truncate(length) 1061 inBuf.WriteString("...") 1062 } 1063 return inBuf 1064} 1065 1066// Conditionally elide a buffer to an 80 character string 1067func elide(inBuf bytes.Buffer) string { 1068 tempBuf := elideToLen(inBuf, 80) 1069 return tempBuf.String() 1070} 1071 1072func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) { 1073 defer close(ch) 1074 var result *drivers.ExitResult 1075 ps, err := handle.exec.Wait(ctx) 1076 if err != nil { 1077 result = &drivers.ExitResult{ 1078 Err: fmt.Errorf("executor: error waiting on process: %v", err), 1079 } 1080 } else { 1081 result = &drivers.ExitResult{ 1082 ExitCode: ps.ExitCode, 1083 Signal: ps.Signal, 1084 } 1085 } 1086 1087 select { 1088 case <-ctx.Done(): 1089 case <-d.ctx.Done(): 1090 case ch <- result: 1091 } 1092} 1093 1094func (d *Driver) Shutdown() { 1095 d.signalShutdown() 1096} 1097