1package docker 2 3import ( 4 "context" 5 "fmt" 6 "net" 7 "os" 8 "path/filepath" 9 "runtime" 10 "strconv" 11 "strings" 12 "sync" 13 "time" 14 15 docker "github.com/fsouza/go-dockerclient" 16 "github.com/hashicorp/consul-template/signals" 17 hclog "github.com/hashicorp/go-hclog" 18 multierror "github.com/hashicorp/go-multierror" 19 plugin "github.com/hashicorp/go-plugin" 20 "github.com/hashicorp/nomad/client/taskenv" 21 "github.com/hashicorp/nomad/drivers/docker/docklog" 22 "github.com/hashicorp/nomad/drivers/shared/eventer" 23 nstructs "github.com/hashicorp/nomad/nomad/structs" 24 "github.com/hashicorp/nomad/plugins/base" 25 "github.com/hashicorp/nomad/plugins/drivers" 26 "github.com/hashicorp/nomad/plugins/shared/structs" 27 pstructs "github.com/hashicorp/nomad/plugins/shared/structs" 28) 29 30var ( 31 // createClientsLock is a lock that protects reading/writing global client 32 // variables 33 createClientsLock sync.Mutex 34 35 // client is a docker client with a timeout of 5 minutes. This is for doing 36 // all operations with the docker daemon besides which are not long running 37 // such as creating, killing containers, etc. 38 client *docker.Client 39 40 // waitClient is a docker client with no timeouts. This is used for long 41 // running operations such as waiting on containers and collect stats 42 waitClient *docker.Client 43 44 dockerTransientErrs = []string{ 45 "Client.Timeout exceeded while awaiting headers", 46 "EOF", 47 "API error (500)", 48 } 49 50 // recoverableErrTimeouts returns a recoverable error if the error was due 51 // to timeouts 52 recoverableErrTimeouts = func(err error) error { 53 r := false 54 if strings.Contains(err.Error(), "Client.Timeout exceeded while awaiting headers") || 55 strings.Contains(err.Error(), "EOF") { 56 r = true 57 } 58 return nstructs.NewRecoverableError(err, r) 59 } 60 61 // taskHandleVersion is the version of task handle which this driver sets 62 // and understands how to decode driver state 63 taskHandleVersion = 1 64 65 // Nvidia-container-runtime environment variable names 66 nvidiaVisibleDevices = "NVIDIA_VISIBLE_DEVICES" 67) 68 69const ( 70 dockerLabelAllocID = "com.hashicorp.nomad.alloc_id" 71) 72 73type Driver struct { 74 // eventer is used to handle multiplexing of TaskEvents calls such that an 75 // event can be broadcast to all callers 76 eventer *eventer.Eventer 77 78 // config contains the runtime configuration for the driver set by the 79 // SetConfig RPC 80 config *DriverConfig 81 82 // clientConfig contains a driver specific subset of the Nomad client 83 // configuration 84 clientConfig *base.ClientDriverConfig 85 86 // ctx is the context for the driver. It is passed to other subsystems to 87 // coordinate shutdown 88 ctx context.Context 89 90 // signalShutdown is called when the driver is shutting down and cancels the 91 // ctx passed to any subsystems 92 signalShutdown context.CancelFunc 93 94 // tasks is the in memory datastore mapping taskIDs to taskHandles 95 tasks *taskStore 96 97 // coordinator is what tracks multiple image pulls against the same docker image 98 coordinator *dockerCoordinator 99 100 // logger will log to the Nomad agent 101 logger hclog.Logger 102 103 // gpuRuntime indicates nvidia-docker runtime availability 104 gpuRuntime bool 105 106 // A tri-state boolean to know if the fingerprinting has happened and 107 // whether it has been successful 108 fingerprintSuccess *bool 109 fingerprintLock sync.RWMutex 110 111 // A boolean to know if the docker driver has ever been correctly detected 112 // for use during fingerprinting. 113 detected bool 114 detectedLock sync.RWMutex 115 116 reconciler *containerReconciler 117} 118 119// NewDockerDriver returns a docker implementation of a driver plugin 120func NewDockerDriver(logger hclog.Logger) drivers.DriverPlugin { 121 ctx, cancel := context.WithCancel(context.Background()) 122 logger = logger.Named(pluginName) 123 return &Driver{ 124 eventer: eventer.NewEventer(ctx, logger), 125 config: &DriverConfig{}, 126 tasks: newTaskStore(), 127 ctx: ctx, 128 signalShutdown: cancel, 129 logger: logger, 130 } 131} 132 133func (d *Driver) reattachToDockerLogger(reattachConfig *structs.ReattachConfig) (docklog.DockerLogger, *plugin.Client, error) { 134 reattach, err := pstructs.ReattachConfigToGoPlugin(reattachConfig) 135 if err != nil { 136 return nil, nil, err 137 } 138 139 dlogger, dloggerPluginClient, err := docklog.ReattachDockerLogger(reattach) 140 if err != nil { 141 return nil, nil, fmt.Errorf("failed to reattach to docker logger process: %v", err) 142 } 143 144 return dlogger, dloggerPluginClient, nil 145} 146 147func (d *Driver) setupNewDockerLogger(container *docker.Container, cfg *drivers.TaskConfig, startTime time.Time) (docklog.DockerLogger, *plugin.Client, error) { 148 dlogger, pluginClient, err := docklog.LaunchDockerLogger(d.logger) 149 if err != nil { 150 if pluginClient != nil { 151 pluginClient.Kill() 152 } 153 return nil, nil, fmt.Errorf("failed to launch docker logger plugin: %v", err) 154 } 155 156 if err := dlogger.Start(&docklog.StartOpts{ 157 Endpoint: d.config.Endpoint, 158 ContainerID: container.ID, 159 TTY: container.Config.Tty, 160 Stdout: cfg.StdoutPath, 161 Stderr: cfg.StderrPath, 162 TLSCert: d.config.TLS.Cert, 163 TLSKey: d.config.TLS.Key, 164 TLSCA: d.config.TLS.CA, 165 StartTime: startTime.Unix(), 166 }); err != nil { 167 pluginClient.Kill() 168 return nil, nil, fmt.Errorf("failed to launch docker logger process %s: %v", container.ID, err) 169 } 170 171 return dlogger, pluginClient, nil 172} 173 174func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { 175 if _, ok := d.tasks.Get(handle.Config.ID); ok { 176 return nil 177 } 178 179 // COMPAT(0.10): pre 0.9 upgrade path check 180 if handle.Version == 0 { 181 return d.recoverPre09Task(handle) 182 } 183 184 var handleState taskHandleState 185 if err := handle.GetDriverState(&handleState); err != nil { 186 return fmt.Errorf("failed to decode driver task state: %v", err) 187 } 188 189 client, _, err := d.dockerClients() 190 if err != nil { 191 return fmt.Errorf("failed to get docker client: %v", err) 192 } 193 194 container, err := client.InspectContainer(handleState.ContainerID) 195 if err != nil { 196 return fmt.Errorf("failed to inspect container for id %q: %v", handleState.ContainerID, err) 197 } 198 199 h := &taskHandle{ 200 client: client, 201 waitClient: waitClient, 202 logger: d.logger.With("container_id", container.ID), 203 task: handle.Config, 204 containerID: container.ID, 205 containerImage: container.Image, 206 doneCh: make(chan bool), 207 waitCh: make(chan struct{}), 208 removeContainerOnExit: d.config.GC.Container, 209 net: handleState.DriverNetwork, 210 } 211 212 if !d.config.DisableLogCollection { 213 h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig) 214 if err != nil { 215 d.logger.Warn("failed to reattach to docker logger process", "error", err) 216 217 h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now()) 218 if err != nil { 219 if err := client.StopContainer(handleState.ContainerID, 0); err != nil { 220 d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) 221 } 222 return fmt.Errorf("failed to setup replacement docker logger: %v", err) 223 } 224 225 if err := handle.SetDriverState(h.buildState()); err != nil { 226 if err := client.StopContainer(handleState.ContainerID, 0); err != nil { 227 d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) 228 } 229 return fmt.Errorf("failed to store driver state: %v", err) 230 } 231 } 232 } 233 234 d.tasks.Set(handle.Config.ID, h) 235 go h.run() 236 237 return nil 238} 239 240func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) { 241 if _, ok := d.tasks.Get(cfg.ID); ok { 242 return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID) 243 } 244 245 var driverConfig TaskConfig 246 247 if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { 248 return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) 249 } 250 251 if driverConfig.Image == "" { 252 return nil, nil, fmt.Errorf("image name required for docker driver") 253 } 254 255 // Remove any http 256 if strings.HasPrefix(driverConfig.Image, "https://") { 257 driverConfig.Image = strings.Replace(driverConfig.Image, "https://", "", 1) 258 } 259 260 handle := drivers.NewTaskHandle(taskHandleVersion) 261 handle.Config = cfg 262 263 // Initialize docker API clients 264 client, _, err := d.dockerClients() 265 if err != nil { 266 return nil, nil, fmt.Errorf("Failed to connect to docker daemon: %s", err) 267 } 268 269 id, err := d.createImage(cfg, &driverConfig, client) 270 if err != nil { 271 return nil, nil, err 272 } 273 274 containerCfg, err := d.createContainerConfig(cfg, &driverConfig, driverConfig.Image) 275 if err != nil { 276 d.logger.Error("failed to create container configuration", "image_name", driverConfig.Image, 277 "image_id", id, "error", err) 278 return nil, nil, fmt.Errorf("Failed to create container configuration for image %q (%q): %v", driverConfig.Image, id, err) 279 } 280 281 startAttempts := 0 282CREATE: 283 container, err := d.createContainer(client, containerCfg, driverConfig.Image) 284 if err != nil { 285 d.logger.Error("failed to create container", "error", err) 286 client.RemoveContainer(docker.RemoveContainerOptions{ 287 ID: containerCfg.Name, 288 Force: true, 289 }) 290 return nil, nil, nstructs.WrapRecoverable(fmt.Sprintf("failed to create container: %v", err), err) 291 } 292 293 d.logger.Info("created container", "container_id", container.ID) 294 295 // We don't need to start the container if the container is already running 296 // since we don't create containers which are already present on the host 297 // and are running 298 if !container.State.Running { 299 // Start the container 300 if err := d.startContainer(container); err != nil { 301 d.logger.Error("failed to start container", "container_id", container.ID, "error", err) 302 client.RemoveContainer(docker.RemoveContainerOptions{ 303 ID: container.ID, 304 Force: true, 305 }) 306 // Some sort of docker race bug, recreating the container usually works 307 if strings.Contains(err.Error(), "OCI runtime create failed: container with id exists:") && startAttempts < 5 { 308 startAttempts++ 309 d.logger.Debug("reattempting container create/start sequence", "attempt", startAttempts, "container_id", id) 310 goto CREATE 311 } 312 return nil, nil, nstructs.WrapRecoverable(fmt.Sprintf("Failed to start container %s: %s", container.ID, err), err) 313 } 314 315 // InspectContainer to get all of the container metadata as 316 // much of the metadata (eg networking) isn't populated until 317 // the container is started 318 runningContainer, err := client.InspectContainer(container.ID) 319 if err != nil { 320 client.RemoveContainer(docker.RemoveContainerOptions{ 321 ID: container.ID, 322 Force: true, 323 }) 324 msg := "failed to inspect started container" 325 d.logger.Error(msg, "error", err) 326 client.RemoveContainer(docker.RemoveContainerOptions{ 327 ID: container.ID, 328 Force: true, 329 }) 330 return nil, nil, nstructs.NewRecoverableError(fmt.Errorf("%s %s: %s", msg, container.ID, err), true) 331 } 332 container = runningContainer 333 d.logger.Info("started container", "container_id", container.ID) 334 } else { 335 d.logger.Debug("re-attaching to container", "container_id", 336 container.ID, "container_state", container.State.String()) 337 } 338 339 collectingLogs := !d.config.DisableLogCollection 340 341 var dlogger docklog.DockerLogger 342 var pluginClient *plugin.Client 343 344 if collectingLogs { 345 dlogger, pluginClient, err = d.setupNewDockerLogger(container, cfg, time.Unix(0, 0)) 346 if err != nil { 347 d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID) 348 client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) 349 return nil, nil, err 350 } 351 } 352 353 // Detect container address 354 ip, autoUse := d.detectIP(container, &driverConfig) 355 356 net := &drivers.DriverNetwork{ 357 PortMap: driverConfig.PortMap, 358 IP: ip, 359 AutoAdvertise: autoUse, 360 } 361 362 // Return a driver handle 363 h := &taskHandle{ 364 client: client, 365 waitClient: waitClient, 366 dlogger: dlogger, 367 dloggerPluginClient: pluginClient, 368 logger: d.logger.With("container_id", container.ID), 369 task: cfg, 370 containerID: container.ID, 371 containerImage: container.Image, 372 doneCh: make(chan bool), 373 waitCh: make(chan struct{}), 374 removeContainerOnExit: d.config.GC.Container, 375 net: net, 376 } 377 378 if err := handle.SetDriverState(h.buildState()); err != nil { 379 d.logger.Error("error encoding container occurred after startup, terminating container", "container_id", container.ID, "error", err) 380 if collectingLogs { 381 dlogger.Stop() 382 pluginClient.Kill() 383 } 384 client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) 385 return nil, nil, err 386 } 387 388 d.tasks.Set(cfg.ID, h) 389 go h.run() 390 391 return handle, net, nil 392} 393 394// createContainerClient is the subset of Docker Client methods used by the 395// createContainer method to ease testing subtle error conditions. 396type createContainerClient interface { 397 CreateContainer(docker.CreateContainerOptions) (*docker.Container, error) 398 InspectContainer(id string) (*docker.Container, error) 399 ListContainers(docker.ListContainersOptions) ([]docker.APIContainers, error) 400 RemoveContainer(opts docker.RemoveContainerOptions) error 401} 402 403// createContainer creates the container given the passed configuration. It 404// attempts to handle any transient Docker errors. 405func (d *Driver) createContainer(client createContainerClient, config docker.CreateContainerOptions, 406 image string) (*docker.Container, error) { 407 // Create a container 408 attempted := 0 409CREATE: 410 container, createErr := client.CreateContainer(config) 411 if createErr == nil { 412 return container, nil 413 } 414 415 d.logger.Debug("failed to create container", "container_name", 416 config.Name, "image_name", image, "image_id", config.Config.Image, 417 "attempt", attempted+1, "error", createErr) 418 419 // Volume management tools like Portworx may not have detached a volume 420 // from a previous node before Nomad started a task replacement task. 421 // Treat these errors as recoverable so we retry. 422 if strings.Contains(strings.ToLower(createErr.Error()), "volume is attached on another node") { 423 return nil, nstructs.NewRecoverableError(createErr, true) 424 } 425 426 // If the container already exists determine whether it's already 427 // running or if it's dead and needs to be recreated. 428 if strings.Contains(strings.ToLower(createErr.Error()), "container already exists") { 429 430 container, err := d.containerByName(config.Name) 431 if err != nil { 432 return nil, err 433 } 434 435 if container != nil && container.State.Running { 436 return container, nil 437 } 438 439 // Delete matching containers 440 err = client.RemoveContainer(docker.RemoveContainerOptions{ 441 ID: container.ID, 442 Force: true, 443 }) 444 if err != nil { 445 d.logger.Error("failed to purge container", "container_id", container.ID) 446 return nil, recoverableErrTimeouts(fmt.Errorf("Failed to purge container %s: %s", container.ID, err)) 447 } else { 448 d.logger.Info("purged container", "container_id", container.ID) 449 } 450 451 if attempted < 5 { 452 attempted++ 453 time.Sleep(nextBackoff(attempted)) 454 goto CREATE 455 } 456 } else if strings.Contains(strings.ToLower(createErr.Error()), "no such image") { 457 // There is still a very small chance this is possible even with the 458 // coordinator so retry. 459 return nil, nstructs.NewRecoverableError(createErr, true) 460 } else if isDockerTransientError(createErr) && attempted < 5 { 461 attempted++ 462 time.Sleep(nextBackoff(attempted)) 463 goto CREATE 464 } 465 466 return nil, recoverableErrTimeouts(createErr) 467} 468 469// startContainer starts the passed container. It attempts to handle any 470// transient Docker errors. 471func (d *Driver) startContainer(c *docker.Container) error { 472 // Start a container 473 attempted := 0 474START: 475 startErr := client.StartContainer(c.ID, c.HostConfig) 476 if startErr == nil || strings.Contains(startErr.Error(), "Container already running") { 477 return nil 478 } 479 480 d.logger.Debug("failed to start container", "container_id", c.ID, "attempt", attempted+1, "error", startErr) 481 482 if isDockerTransientError(startErr) { 483 if attempted < 5 { 484 attempted++ 485 time.Sleep(nextBackoff(attempted)) 486 goto START 487 } 488 return nstructs.NewRecoverableError(startErr, true) 489 } 490 491 return recoverableErrTimeouts(startErr) 492} 493 494// nextBackoff returns appropriate docker backoff durations after attempted attempts. 495func nextBackoff(attempted int) time.Duration { 496 // attempts in 200ms, 800ms, 3.2s, 12.8s, 51.2s 497 // TODO: add randomization factor and extract to a helper 498 return 1 << (2 * uint64(attempted)) * 50 * time.Millisecond 499} 500 501// createImage creates a docker image either by pulling it from a registry or by 502// loading it from the file system 503func (d *Driver) createImage(task *drivers.TaskConfig, driverConfig *TaskConfig, client *docker.Client) (string, error) { 504 image := driverConfig.Image 505 repo, tag := parseDockerImage(image) 506 507 callerID := fmt.Sprintf("%s-%s", task.ID, task.Name) 508 509 // We're going to check whether the image is already downloaded. If the tag 510 // is "latest", or ForcePull is set, we have to check for a new version every time so we don't 511 // bother to check and cache the id here. We'll download first, then cache. 512 if driverConfig.ForcePull { 513 d.logger.Debug("force pulling image instead of inspecting local", "image_ref", dockerImageRef(repo, tag)) 514 } else if tag != "latest" { 515 if dockerImage, _ := client.InspectImage(image); dockerImage != nil { 516 // Image exists so just increment its reference count 517 d.coordinator.IncrementImageReference(dockerImage.ID, image, callerID) 518 return dockerImage.ID, nil 519 } 520 } 521 522 // Load the image if specified 523 if driverConfig.LoadImage != "" { 524 return d.loadImage(task, driverConfig, client) 525 } 526 527 // Download the image 528 return d.pullImage(task, driverConfig, client, repo, tag) 529} 530 531// pullImage creates an image by pulling it from a docker registry 532func (d *Driver) pullImage(task *drivers.TaskConfig, driverConfig *TaskConfig, client *docker.Client, repo, tag string) (id string, err error) { 533 authOptions, err := d.resolveRegistryAuthentication(driverConfig, repo) 534 if err != nil { 535 if driverConfig.AuthSoftFail { 536 d.logger.Warn("Failed to find docker repo auth", "repo", repo, "error", err) 537 } else { 538 return "", fmt.Errorf("Failed to find docker auth for repo %q: %v", repo, err) 539 } 540 } 541 542 if authIsEmpty(authOptions) { 543 d.logger.Debug("did not find docker auth for repo", "repo", repo) 544 } 545 546 d.eventer.EmitEvent(&drivers.TaskEvent{ 547 TaskID: task.ID, 548 AllocID: task.AllocID, 549 TaskName: task.Name, 550 Timestamp: time.Now(), 551 Message: "Downloading image", 552 Annotations: map[string]string{ 553 "image": dockerImageRef(repo, tag), 554 }, 555 }) 556 557 return d.coordinator.PullImage(driverConfig.Image, authOptions, task.ID, d.emitEventFunc(task), d.config.pullActivityTimeoutDuration) 558} 559 560func (d *Driver) emitEventFunc(task *drivers.TaskConfig) LogEventFn { 561 return func(msg string, annotations map[string]string) { 562 d.eventer.EmitEvent(&drivers.TaskEvent{ 563 TaskID: task.ID, 564 AllocID: task.AllocID, 565 TaskName: task.Name, 566 Timestamp: time.Now(), 567 Message: msg, 568 Annotations: annotations, 569 }) 570 } 571} 572 573// authBackend encapsulates a function that resolves registry credentials. 574type authBackend func(string) (*docker.AuthConfiguration, error) 575 576// resolveRegistryAuthentication attempts to retrieve auth credentials for the 577// repo, trying all authentication-backends possible. 578func (d *Driver) resolveRegistryAuthentication(driverConfig *TaskConfig, repo string) (*docker.AuthConfiguration, error) { 579 return firstValidAuth(repo, []authBackend{ 580 authFromTaskConfig(driverConfig), 581 authFromDockerConfig(d.config.Auth.Config), 582 authFromHelper(d.config.Auth.Helper), 583 }) 584} 585 586// loadImage creates an image by loading it from the file system 587func (d *Driver) loadImage(task *drivers.TaskConfig, driverConfig *TaskConfig, client *docker.Client) (id string, err error) { 588 589 archive := filepath.Join(task.TaskDir().LocalDir, driverConfig.LoadImage) 590 d.logger.Debug("loading image from disk", "archive", archive) 591 592 f, err := os.Open(archive) 593 if err != nil { 594 return "", fmt.Errorf("unable to open image archive: %v", err) 595 } 596 597 if err := client.LoadImage(docker.LoadImageOptions{InputStream: f}); err != nil { 598 return "", err 599 } 600 f.Close() 601 602 dockerImage, err := client.InspectImage(driverConfig.Image) 603 if err != nil { 604 return "", recoverableErrTimeouts(err) 605 } 606 607 d.coordinator.IncrementImageReference(dockerImage.ID, driverConfig.Image, task.ID) 608 return dockerImage.ID, nil 609} 610 611func (d *Driver) containerBinds(task *drivers.TaskConfig, driverConfig *TaskConfig) ([]string, error) { 612 613 allocDirBind := fmt.Sprintf("%s:%s", task.TaskDir().SharedAllocDir, task.Env[taskenv.AllocDir]) 614 taskLocalBind := fmt.Sprintf("%s:%s", task.TaskDir().LocalDir, task.Env[taskenv.TaskLocalDir]) 615 secretDirBind := fmt.Sprintf("%s:%s", task.TaskDir().SecretsDir, task.Env[taskenv.SecretsDir]) 616 binds := []string{allocDirBind, taskLocalBind, secretDirBind} 617 618 taskLocalBindVolume := driverConfig.VolumeDriver == "" 619 620 if !d.config.Volumes.Enabled && !taskLocalBindVolume { 621 return nil, fmt.Errorf("volumes are not enabled; cannot use volume driver %q", driverConfig.VolumeDriver) 622 } 623 624 for _, userbind := range driverConfig.Volumes { 625 // This assumes host OS = docker container OS. 626 // Not true, when we support Linux containers on Windows 627 src, dst, mode, err := parseVolumeSpec(userbind, runtime.GOOS) 628 if err != nil { 629 return nil, fmt.Errorf("invalid docker volume %q: %v", userbind, err) 630 } 631 632 // Paths inside task dir are always allowed when using the default driver, 633 // Relative paths are always allowed as they mount within a container 634 // When a VolumeDriver is set, we assume we receive a binding in the format 635 // volume-name:container-dest 636 // Otherwise, we assume we receive a relative path binding in the format 637 // relative/to/task:/also/in/container 638 if taskLocalBindVolume { 639 src = expandPath(task.TaskDir().Dir, src) 640 } else { 641 // Resolve dotted path segments 642 src = filepath.Clean(src) 643 } 644 645 if !d.config.Volumes.Enabled && !isParentPath(task.AllocDir, src) { 646 return nil, fmt.Errorf("volumes are not enabled; cannot mount host paths: %+q", userbind) 647 } 648 649 bind := src + ":" + dst 650 if mode != "" { 651 bind += ":" + mode 652 } 653 binds = append(binds, bind) 654 } 655 656 if selinuxLabel := d.config.Volumes.SelinuxLabel; selinuxLabel != "" { 657 // Apply SELinux Label to each volume 658 for i := range binds { 659 binds[i] = fmt.Sprintf("%s:%s", binds[i], selinuxLabel) 660 } 661 } 662 663 return binds, nil 664} 665 666var userMountToUnixMount = map[string]string{ 667 // Empty string maps to `rprivate` for backwards compatibility in restored 668 // older tasks, where mount propagation will not be present. 669 "": "rprivate", 670 nstructs.VolumeMountPropagationPrivate: "rprivate", 671 nstructs.VolumeMountPropagationHostToTask: "rslave", 672 nstructs.VolumeMountPropagationBidirectional: "rshared", 673} 674 675func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *TaskConfig, 676 imageID string) (docker.CreateContainerOptions, error) { 677 678 // ensure that PortMap variables are populated early on 679 task.Env = taskenv.SetPortMapEnvs(task.Env, driverConfig.PortMap) 680 681 logger := d.logger.With("task_name", task.Name) 682 var c docker.CreateContainerOptions 683 if task.Resources == nil { 684 // Guard against missing resources. We should never have been able to 685 // schedule a job without specifying this. 686 logger.Error("task.Resources is empty") 687 return c, fmt.Errorf("task.Resources is empty") 688 } 689 690 binds, err := d.containerBinds(task, driverConfig) 691 if err != nil { 692 return c, err 693 } 694 logger.Trace("binding volumes", "volumes", binds) 695 696 // create the config block that will later be consumed by go-dockerclient 697 config := &docker.Config{ 698 Image: imageID, 699 Entrypoint: driverConfig.Entrypoint, 700 Hostname: driverConfig.Hostname, 701 User: task.User, 702 Tty: driverConfig.TTY, 703 OpenStdin: driverConfig.Interactive, 704 } 705 706 if driverConfig.WorkDir != "" { 707 config.WorkingDir = driverConfig.WorkDir 708 } 709 710 hostConfig := &docker.HostConfig{ 711 Memory: task.Resources.LinuxResources.MemoryLimitBytes, 712 CPUShares: task.Resources.LinuxResources.CPUShares, 713 714 // Binds are used to mount a host volume into the container. We mount a 715 // local directory for storage and a shared alloc directory that can be 716 // used to share data between different tasks in the same task group. 717 Binds: binds, 718 719 StorageOpt: driverConfig.StorageOpt, 720 VolumeDriver: driverConfig.VolumeDriver, 721 722 PidsLimit: driverConfig.PidsLimit, 723 } 724 725 if _, ok := task.DeviceEnv[nvidiaVisibleDevices]; ok { 726 if !d.gpuRuntime { 727 return c, fmt.Errorf("requested docker-runtime %q was not found", d.config.GPURuntimeName) 728 } 729 hostConfig.Runtime = d.config.GPURuntimeName 730 } 731 732 // Calculate CPU Quota 733 // cfs_quota_us is the time per core, so we must 734 // multiply the time by the number of cores available 735 // See https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/sec-cpu 736 if driverConfig.CPUHardLimit { 737 numCores := runtime.NumCPU() 738 if driverConfig.CPUCFSPeriod < 0 || driverConfig.CPUCFSPeriod > 1000000 { 739 return c, fmt.Errorf("invalid value for cpu_cfs_period") 740 } 741 if driverConfig.CPUCFSPeriod == 0 { 742 driverConfig.CPUCFSPeriod = task.Resources.LinuxResources.CPUPeriod 743 } 744 hostConfig.CPUPeriod = driverConfig.CPUCFSPeriod 745 hostConfig.CPUQuota = int64(task.Resources.LinuxResources.PercentTicks*float64(driverConfig.CPUCFSPeriod)) * int64(numCores) 746 } 747 748 // Windows does not support MemorySwap/MemorySwappiness #2193 749 if runtime.GOOS == "windows" { 750 hostConfig.MemorySwap = 0 751 hostConfig.MemorySwappiness = -1 752 } else { 753 hostConfig.MemorySwap = task.Resources.LinuxResources.MemoryLimitBytes // MemorySwap is memory + swap. 754 } 755 756 loggingDriver := driverConfig.Logging.Type 757 if loggingDriver == "" { 758 loggingDriver = driverConfig.Logging.Driver 759 } 760 761 hostConfig.LogConfig = docker.LogConfig{ 762 Type: loggingDriver, 763 Config: driverConfig.Logging.Config, 764 } 765 766 if hostConfig.LogConfig.Type == "" && hostConfig.LogConfig.Config == nil { 767 logger.Trace("no docker log driver provided, defaulting to json-file") 768 hostConfig.LogConfig.Type = "json-file" 769 hostConfig.LogConfig.Config = map[string]string{ 770 "max-file": "2", 771 "max-size": "2m", 772 } 773 } 774 775 logger.Debug("configured resources", "memory", hostConfig.Memory, 776 "cpu_shares", hostConfig.CPUShares, "cpu_quota", hostConfig.CPUQuota, 777 "cpu_period", hostConfig.CPUPeriod) 778 logger.Debug("binding directories", "binds", hclog.Fmt("%#v", hostConfig.Binds)) 779 780 // set privileged mode 781 if driverConfig.Privileged && !d.config.AllowPrivileged { 782 return c, fmt.Errorf(`Docker privileged mode is disabled on this Nomad agent`) 783 } 784 hostConfig.Privileged = driverConfig.Privileged 785 786 // set capabilities 787 hostCapsWhitelistConfig := d.config.AllowCaps 788 hostCapsWhitelist := make(map[string]struct{}) 789 for _, cap := range hostCapsWhitelistConfig { 790 cap = strings.ToLower(strings.TrimSpace(cap)) 791 hostCapsWhitelist[cap] = struct{}{} 792 } 793 794 if _, ok := hostCapsWhitelist["all"]; !ok { 795 effectiveCaps, err := tweakCapabilities( 796 strings.Split(dockerBasicCaps, ","), 797 driverConfig.CapAdd, 798 driverConfig.CapDrop, 799 ) 800 if err != nil { 801 return c, err 802 } 803 var missingCaps []string 804 for _, cap := range effectiveCaps { 805 cap = strings.ToLower(cap) 806 if _, ok := hostCapsWhitelist[cap]; !ok { 807 missingCaps = append(missingCaps, cap) 808 } 809 } 810 if len(missingCaps) > 0 { 811 return c, fmt.Errorf("Docker driver doesn't have the following caps whitelisted on this Nomad agent: %s", missingCaps) 812 } 813 } 814 815 hostConfig.CapAdd = driverConfig.CapAdd 816 hostConfig.CapDrop = driverConfig.CapDrop 817 818 // set SHM size 819 if driverConfig.ShmSize != 0 { 820 hostConfig.ShmSize = driverConfig.ShmSize 821 } 822 823 // set DNS servers 824 for _, ip := range driverConfig.DNSServers { 825 if net.ParseIP(ip) != nil { 826 hostConfig.DNS = append(hostConfig.DNS, ip) 827 } else { 828 logger.Error("invalid ip address for container dns server", "ip", ip) 829 } 830 } 831 832 // Setup devices 833 for _, device := range driverConfig.Devices { 834 dd, err := device.toDockerDevice() 835 if err != nil { 836 return c, err 837 } 838 hostConfig.Devices = append(hostConfig.Devices, dd) 839 } 840 for _, device := range task.Devices { 841 hostConfig.Devices = append(hostConfig.Devices, docker.Device{ 842 PathOnHost: device.HostPath, 843 PathInContainer: device.TaskPath, 844 CgroupPermissions: device.Permissions, 845 }) 846 } 847 848 // Setup mounts 849 for _, m := range driverConfig.Mounts { 850 hm, err := m.toDockerHostMount() 851 if err != nil { 852 return c, err 853 } 854 855 if hm.Type == "bind" { 856 hm.Source = expandPath(task.TaskDir().Dir, hm.Source) 857 858 // paths inside alloc dir are always allowed as they mount within a container, and treated as relative to task dir 859 if !d.config.Volumes.Enabled && !isParentPath(task.AllocDir, hm.Source) { 860 return c, fmt.Errorf("volumes are not enabled; cannot mount host path: %q %q", hm.Source, task.AllocDir) 861 } 862 } 863 864 hostConfig.Mounts = append(hostConfig.Mounts, hm) 865 } 866 867 for _, m := range task.Mounts { 868 hm := docker.HostMount{ 869 Type: "bind", 870 Target: m.TaskPath, 871 Source: m.HostPath, 872 ReadOnly: m.Readonly, 873 } 874 875 // MountPropagation is only supported by Docker on Linux: 876 // https://docs.docker.com/storage/bind-mounts/#configure-bind-propagation 877 if runtime.GOOS == "linux" { 878 hm.BindOptions = &docker.BindOptions{ 879 Propagation: userMountToUnixMount[m.PropagationMode], 880 } 881 } 882 883 hostConfig.Mounts = append(hostConfig.Mounts, hm) 884 } 885 886 // set DNS search domains and extra hosts 887 hostConfig.DNSSearch = driverConfig.DNSSearchDomains 888 hostConfig.DNSOptions = driverConfig.DNSOptions 889 hostConfig.ExtraHosts = driverConfig.ExtraHosts 890 891 hostConfig.IpcMode = driverConfig.IPCMode 892 hostConfig.PidMode = driverConfig.PidMode 893 hostConfig.UTSMode = driverConfig.UTSMode 894 hostConfig.UsernsMode = driverConfig.UsernsMode 895 hostConfig.SecurityOpt = driverConfig.SecurityOpt 896 hostConfig.Sysctls = driverConfig.Sysctl 897 898 ulimits, err := sliceMergeUlimit(driverConfig.Ulimit) 899 if err != nil { 900 return c, fmt.Errorf("failed to parse ulimit configuration: %v", err) 901 } 902 hostConfig.Ulimits = ulimits 903 904 hostConfig.ReadonlyRootfs = driverConfig.ReadonlyRootfs 905 906 // set the docker network mode 907 hostConfig.NetworkMode = driverConfig.NetworkMode 908 909 // if the driver config does not specify a network mode then try to use the 910 // shared alloc network 911 if hostConfig.NetworkMode == "" { 912 if task.NetworkIsolation != nil && task.NetworkIsolation.Path != "" { 913 // find the previously created parent container to join networks with 914 netMode := fmt.Sprintf("container:%s", task.NetworkIsolation.Labels[dockerNetSpecLabelKey]) 915 logger.Debug("configuring network mode for task group", "network_mode", netMode) 916 hostConfig.NetworkMode = netMode 917 } else { 918 // docker default 919 logger.Debug("networking mode not specified; using default") 920 hostConfig.NetworkMode = "default" 921 } 922 } 923 924 // Setup port mapping and exposed ports 925 if len(task.Resources.NomadResources.Networks) == 0 { 926 if len(driverConfig.PortMap) > 0 { 927 return c, fmt.Errorf("Trying to map ports but no network interface is available") 928 } 929 } else { 930 // TODO add support for more than one network 931 network := task.Resources.NomadResources.Networks[0] 932 publishedPorts := map[docker.Port][]docker.PortBinding{} 933 exposedPorts := map[docker.Port]struct{}{} 934 935 for _, port := range network.ReservedPorts { 936 // By default we will map the allocated port 1:1 to the container 937 containerPortInt := port.Value 938 939 // If the user has mapped a port using port_map we'll change it here 940 if mapped, ok := driverConfig.PortMap[port.Label]; ok { 941 containerPortInt = mapped 942 } 943 944 hostPortStr := strconv.Itoa(port.Value) 945 containerPort := docker.Port(strconv.Itoa(containerPortInt)) 946 947 publishedPorts[containerPort+"/tcp"] = getPortBinding(network.IP, hostPortStr) 948 publishedPorts[containerPort+"/udp"] = getPortBinding(network.IP, hostPortStr) 949 logger.Debug("allocated static port", "ip", network.IP, "port", port.Value) 950 951 exposedPorts[containerPort+"/tcp"] = struct{}{} 952 exposedPorts[containerPort+"/udp"] = struct{}{} 953 logger.Debug("exposed port", "port", port.Value) 954 } 955 956 for _, port := range network.DynamicPorts { 957 // By default we will map the allocated port 1:1 to the container 958 containerPortInt := port.Value 959 960 // If the user has mapped a port using port_map we'll change it here 961 if mapped, ok := driverConfig.PortMap[port.Label]; ok { 962 containerPortInt = mapped 963 } 964 965 hostPortStr := strconv.Itoa(port.Value) 966 containerPort := docker.Port(strconv.Itoa(containerPortInt)) 967 968 publishedPorts[containerPort+"/tcp"] = getPortBinding(network.IP, hostPortStr) 969 publishedPorts[containerPort+"/udp"] = getPortBinding(network.IP, hostPortStr) 970 logger.Debug("allocated mapped port", "ip", network.IP, "port", port.Value) 971 972 exposedPorts[containerPort+"/tcp"] = struct{}{} 973 exposedPorts[containerPort+"/udp"] = struct{}{} 974 logger.Debug("exposed port", "port", containerPort) 975 } 976 977 hostConfig.PortBindings = publishedPorts 978 config.ExposedPorts = exposedPorts 979 } 980 981 // If the user specified a custom command to run, we'll inject it here. 982 if driverConfig.Command != "" { 983 // Validate command 984 if err := validateCommand(driverConfig.Command, "args"); err != nil { 985 return c, err 986 } 987 988 cmd := []string{driverConfig.Command} 989 if len(driverConfig.Args) != 0 { 990 cmd = append(cmd, driverConfig.Args...) 991 } 992 logger.Debug("setting container startup command", "command", strings.Join(cmd, " ")) 993 config.Cmd = cmd 994 } else if len(driverConfig.Args) != 0 { 995 config.Cmd = driverConfig.Args 996 } 997 998 if len(driverConfig.Labels) > 0 { 999 config.Labels = driverConfig.Labels 1000 } 1001 1002 labels := make(map[string]string, len(driverConfig.Labels)+1) 1003 for k, v := range driverConfig.Labels { 1004 labels[k] = v 1005 } 1006 labels[dockerLabelAllocID] = task.AllocID 1007 config.Labels = labels 1008 logger.Debug("applied labels on the container", "labels", config.Labels) 1009 1010 config.Env = task.EnvList() 1011 1012 containerName := fmt.Sprintf("%s-%s", strings.Replace(task.Name, "/", "_", -1), task.AllocID) 1013 logger.Debug("setting container name", "container_name", containerName) 1014 1015 var networkingConfig *docker.NetworkingConfig 1016 if len(driverConfig.NetworkAliases) > 0 || driverConfig.IPv4Address != "" || driverConfig.IPv6Address != "" { 1017 networkingConfig = &docker.NetworkingConfig{ 1018 EndpointsConfig: map[string]*docker.EndpointConfig{ 1019 hostConfig.NetworkMode: {}, 1020 }, 1021 } 1022 } 1023 1024 if len(driverConfig.NetworkAliases) > 0 { 1025 networkingConfig.EndpointsConfig[hostConfig.NetworkMode].Aliases = driverConfig.NetworkAliases 1026 logger.Debug("setting container network aliases", "network_mode", hostConfig.NetworkMode, 1027 "network_aliases", strings.Join(driverConfig.NetworkAliases, ", ")) 1028 } 1029 1030 if driverConfig.IPv4Address != "" || driverConfig.IPv6Address != "" { 1031 networkingConfig.EndpointsConfig[hostConfig.NetworkMode].IPAMConfig = &docker.EndpointIPAMConfig{ 1032 IPv4Address: driverConfig.IPv4Address, 1033 IPv6Address: driverConfig.IPv6Address, 1034 } 1035 logger.Debug("setting container network configuration", "network_mode", hostConfig.NetworkMode, 1036 "ipv4_address", driverConfig.IPv4Address, "ipv6_address", driverConfig.IPv6Address) 1037 } 1038 1039 if driverConfig.MacAddress != "" { 1040 config.MacAddress = driverConfig.MacAddress 1041 logger.Debug("setting container mac address", "mac_address", config.MacAddress) 1042 } 1043 1044 return docker.CreateContainerOptions{ 1045 Name: containerName, 1046 Config: config, 1047 HostConfig: hostConfig, 1048 NetworkingConfig: networkingConfig, 1049 }, nil 1050} 1051 1052// detectIP of Docker container. Returns the first IP found as well as true if 1053// the IP should be advertised (bridge network IPs return false). Returns an 1054// empty string and false if no IP could be found. 1055func (d *Driver) detectIP(c *docker.Container, driverConfig *TaskConfig) (string, bool) { 1056 if c.NetworkSettings == nil { 1057 // This should only happen if there's been a coding error (such 1058 // as not calling InspectContainer after CreateContainer). Code 1059 // defensively in case the Docker API changes subtly. 1060 d.logger.Error("no network settings for container", "container_id", c.ID) 1061 return "", false 1062 } 1063 1064 ip, ipName := "", "" 1065 auto := false 1066 for name, net := range c.NetworkSettings.Networks { 1067 if net.IPAddress == "" { 1068 // Ignore networks without an IP address 1069 continue 1070 } 1071 1072 ip = net.IPAddress 1073 if driverConfig.AdvertiseIPv6Addr { 1074 ip = net.GlobalIPv6Address 1075 auto = true 1076 } 1077 ipName = name 1078 1079 // Don't auto-advertise IPs for default networks (bridge on 1080 // Linux, nat on Windows) 1081 if name != "bridge" && name != "nat" { 1082 auto = true 1083 } 1084 1085 break 1086 } 1087 1088 if n := len(c.NetworkSettings.Networks); n > 1 { 1089 d.logger.Warn("multiple Docker networks for container found but Nomad only supports 1", 1090 "total_networks", n, 1091 "container_id", c.ID, 1092 "container_network", ipName) 1093 } 1094 1095 return ip, auto 1096} 1097 1098// containerByName finds a running container by name, and returns an error 1099// if the container is dead or can't be found. 1100func (d *Driver) containerByName(name string) (*docker.Container, error) { 1101 1102 client, _, err := d.dockerClients() 1103 if err != nil { 1104 return nil, err 1105 } 1106 containers, err := client.ListContainers(docker.ListContainersOptions{ 1107 All: true, 1108 }) 1109 if err != nil { 1110 d.logger.Error("failed to query list of containers matching name", 1111 "container_name", name) 1112 return nil, recoverableErrTimeouts( 1113 fmt.Errorf("Failed to query list of containers: %s", err)) 1114 } 1115 1116 // container names with a / pre-pended to the Nomad generated container names 1117 containerName := "/" + name 1118 var ( 1119 shimContainer docker.APIContainers 1120 found bool 1121 ) 1122OUTER: 1123 for _, shimContainer = range containers { 1124 d.logger.Trace("listed container", "names", hclog.Fmt("%+v", shimContainer.Names)) 1125 for _, name := range shimContainer.Names { 1126 if name == containerName { 1127 d.logger.Trace("Found container", 1128 "container_name", containerName, "container_id", shimContainer.ID) 1129 found = true 1130 break OUTER 1131 } 1132 } 1133 } 1134 if !found { 1135 return nil, nil 1136 } 1137 1138 container, err := client.InspectContainer(shimContainer.ID) 1139 if err != nil { 1140 err = fmt.Errorf("Failed to inspect container %s: %s", shimContainer.ID, err) 1141 1142 // This error is always recoverable as it could 1143 // be caused by races between listing 1144 // containers and this container being removed. 1145 // See #2802 1146 return nil, nstructs.NewRecoverableError(err, true) 1147 } 1148 return container, nil 1149} 1150 1151// validateCommand validates that the command only has a single value and 1152// returns a user friendly error message telling them to use the passed 1153// argField. 1154func validateCommand(command, argField string) error { 1155 trimmed := strings.TrimSpace(command) 1156 if len(trimmed) == 0 { 1157 return fmt.Errorf("command empty: %q", command) 1158 } 1159 1160 if len(trimmed) != len(command) { 1161 return fmt.Errorf("command contains extra white space: %q", command) 1162 } 1163 1164 return nil 1165} 1166 1167func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { 1168 h, ok := d.tasks.Get(taskID) 1169 if !ok { 1170 return nil, drivers.ErrTaskNotFound 1171 } 1172 ch := make(chan *drivers.ExitResult) 1173 go d.handleWait(ctx, ch, h) 1174 return ch, nil 1175} 1176 1177func (d *Driver) handleWait(ctx context.Context, ch chan *drivers.ExitResult, h *taskHandle) { 1178 defer close(ch) 1179 select { 1180 case <-h.waitCh: 1181 ch <- h.ExitResult() 1182 case <-ctx.Done(): 1183 ch <- &drivers.ExitResult{ 1184 Err: ctx.Err(), 1185 } 1186 } 1187} 1188 1189func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { 1190 h, ok := d.tasks.Get(taskID) 1191 if !ok { 1192 return drivers.ErrTaskNotFound 1193 } 1194 1195 if signal == "" { 1196 signal = "SIGINT" 1197 } 1198 1199 // Windows Docker daemon does not support SIGINT, SIGTERM is the semantic equivalent that 1200 // allows for graceful shutdown before being followed up by a SIGKILL. 1201 // Supported signals: 1202 // https://github.com/moby/moby/blob/0111ee70874a4947d93f64b672f66a2a35071ee2/pkg/signal/signal_windows.go#L17-L26 1203 if runtime.GOOS == "windows" && signal == "SIGINT" { 1204 signal = "SIGTERM" 1205 } 1206 1207 sig, err := signals.Parse(signal) 1208 if err != nil { 1209 return fmt.Errorf("failed to parse signal: %v", err) 1210 } 1211 1212 return h.Kill(timeout, sig) 1213} 1214 1215func (d *Driver) DestroyTask(taskID string, force bool) error { 1216 h, ok := d.tasks.Get(taskID) 1217 if !ok { 1218 return drivers.ErrTaskNotFound 1219 } 1220 1221 c, err := h.client.InspectContainer(h.containerID) 1222 if err != nil { 1223 switch err.(type) { 1224 case *docker.NoSuchContainer: 1225 h.logger.Info("container was removed out of band, will proceed with DestroyTask", 1226 "error", err) 1227 default: 1228 return fmt.Errorf("failed to inspect container state: %v", err) 1229 } 1230 } else { 1231 if c.State.Running { 1232 if !force { 1233 return fmt.Errorf("must call StopTask for the given task before Destroy or set force to true") 1234 } 1235 if err := h.client.StopContainer(h.containerID, 0); err != nil { 1236 h.logger.Warn("failed to stop container during destroy", "error", err) 1237 } 1238 } 1239 1240 if h.removeContainerOnExit { 1241 if err := h.client.RemoveContainer(docker.RemoveContainerOptions{ID: h.containerID, RemoveVolumes: true, Force: true}); err != nil { 1242 h.logger.Error("error removing container", "error", err) 1243 } 1244 } else { 1245 h.logger.Debug("not removing container due to config") 1246 } 1247 } 1248 1249 if err := d.cleanupImage(h); err != nil { 1250 h.logger.Error("failed to cleanup image after destroying container", 1251 "error", err) 1252 } 1253 1254 d.tasks.Delete(taskID) 1255 return nil 1256} 1257 1258// cleanupImage removes a Docker image. No error is returned if the image 1259// doesn't exist or is still in use. Requires the global client to already be 1260// initialized. 1261func (d *Driver) cleanupImage(handle *taskHandle) error { 1262 if !d.config.GC.Image { 1263 return nil 1264 } 1265 1266 d.coordinator.RemoveImage(handle.containerImage, handle.task.ID) 1267 1268 return nil 1269} 1270 1271func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { 1272 h, ok := d.tasks.Get(taskID) 1273 if !ok { 1274 return nil, drivers.ErrTaskNotFound 1275 } 1276 1277 container, err := client.InspectContainer(h.containerID) 1278 if err != nil { 1279 return nil, fmt.Errorf("failed to inspect container %q: %v", h.containerID, err) 1280 } 1281 status := &drivers.TaskStatus{ 1282 ID: h.task.ID, 1283 Name: h.task.Name, 1284 StartedAt: container.State.StartedAt, 1285 CompletedAt: container.State.FinishedAt, 1286 DriverAttributes: map[string]string{ 1287 "container_id": container.ID, 1288 }, 1289 NetworkOverride: h.net, 1290 ExitResult: h.ExitResult(), 1291 } 1292 1293 status.State = drivers.TaskStateUnknown 1294 if container.State.Running { 1295 status.State = drivers.TaskStateRunning 1296 } 1297 if container.State.Dead { 1298 status.State = drivers.TaskStateExited 1299 } 1300 1301 return status, nil 1302} 1303 1304func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) { 1305 h, ok := d.tasks.Get(taskID) 1306 if !ok { 1307 return nil, drivers.ErrTaskNotFound 1308 } 1309 1310 return h.Stats(ctx, interval) 1311} 1312 1313func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) { 1314 return d.eventer.TaskEvents(ctx) 1315} 1316 1317func (d *Driver) SignalTask(taskID string, signal string) error { 1318 h, ok := d.tasks.Get(taskID) 1319 if !ok { 1320 return drivers.ErrTaskNotFound 1321 } 1322 1323 sig, err := signals.Parse(signal) 1324 if err != nil { 1325 return fmt.Errorf("failed to parse signal: %v", err) 1326 } 1327 1328 return h.Signal(sig) 1329} 1330 1331func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { 1332 h, ok := d.tasks.Get(taskID) 1333 if !ok { 1334 return nil, drivers.ErrTaskNotFound 1335 } 1336 1337 if len(cmd) == 0 { 1338 return nil, fmt.Errorf("cmd is required, but was empty") 1339 } 1340 1341 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1342 defer cancel() 1343 1344 return h.Exec(ctx, cmd[0], cmd[1:]) 1345} 1346 1347var _ drivers.ExecTaskStreamingDriver = (*Driver)(nil) 1348 1349func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, opts *drivers.ExecOptions) (*drivers.ExitResult, error) { 1350 defer opts.Stdout.Close() 1351 defer opts.Stderr.Close() 1352 1353 done := make(chan interface{}) 1354 defer close(done) 1355 1356 h, ok := d.tasks.Get(taskID) 1357 if !ok { 1358 return nil, drivers.ErrTaskNotFound 1359 } 1360 1361 if len(opts.Command) == 0 { 1362 return nil, fmt.Errorf("command is required but was empty") 1363 } 1364 1365 createExecOpts := docker.CreateExecOptions{ 1366 AttachStdin: true, 1367 AttachStdout: true, 1368 AttachStderr: true, 1369 Tty: opts.Tty, 1370 Cmd: opts.Command, 1371 Container: h.containerID, 1372 Context: ctx, 1373 } 1374 exec, err := h.client.CreateExec(createExecOpts) 1375 if err != nil { 1376 return nil, fmt.Errorf("failed to create exec object: %v", err) 1377 } 1378 1379 go func() { 1380 for { 1381 select { 1382 case <-ctx.Done(): 1383 return 1384 case <-done: 1385 return 1386 case s, ok := <-opts.ResizeCh: 1387 if !ok { 1388 return 1389 } 1390 client.ResizeExecTTY(exec.ID, s.Height, s.Width) 1391 } 1392 } 1393 }() 1394 1395 startOpts := docker.StartExecOptions{ 1396 Detach: false, 1397 1398 // When running in TTY, we must use a raw terminal. 1399 // If not, we set RawTerminal to false to allow docker client 1400 // to interpret special stdout/stderr messages 1401 Tty: opts.Tty, 1402 RawTerminal: opts.Tty, 1403 1404 InputStream: opts.Stdin, 1405 OutputStream: opts.Stdout, 1406 ErrorStream: opts.Stderr, 1407 Context: ctx, 1408 } 1409 if err := client.StartExec(exec.ID, startOpts); err != nil { 1410 return nil, fmt.Errorf("failed to start exec: %v", err) 1411 } 1412 1413 // StartExec returns after process completes, but InspectExec seems to have a delay 1414 // get in getting status code 1415 1416 const execTerminatingTimeout = 3 * time.Second 1417 start := time.Now() 1418 var res *docker.ExecInspect 1419 for (res == nil || res.Running) && time.Since(start) <= execTerminatingTimeout { 1420 res, err = client.InspectExec(exec.ID) 1421 if err != nil { 1422 return nil, fmt.Errorf("failed to inspect exec result: %v", err) 1423 } 1424 time.Sleep(50 * time.Millisecond) 1425 } 1426 1427 if res == nil || res.Running { 1428 return nil, fmt.Errorf("failed to retrieve exec result") 1429 } 1430 1431 return &drivers.ExitResult{ 1432 ExitCode: res.ExitCode, 1433 }, nil 1434} 1435 1436// dockerClients creates two *docker.Client, one for long running operations and 1437// the other for shorter operations. In test / dev mode we can use ENV vars to 1438// connect to the docker daemon. In production mode we will read docker.endpoint 1439// from the config file. 1440func (d *Driver) dockerClients() (*docker.Client, *docker.Client, error) { 1441 createClientsLock.Lock() 1442 defer createClientsLock.Unlock() 1443 1444 if client != nil && waitClient != nil { 1445 return client, waitClient, nil 1446 } 1447 1448 var err error 1449 1450 // Onlt initialize the client if it hasn't yet been done 1451 if client == nil { 1452 client, err = d.newDockerClient(dockerTimeout) 1453 if err != nil { 1454 return nil, nil, err 1455 } 1456 } 1457 1458 // Only initialize the waitClient if it hasn't yet been done 1459 if waitClient == nil { 1460 waitClient, err = d.newDockerClient(0 * time.Minute) 1461 if err != nil { 1462 return nil, nil, err 1463 } 1464 } 1465 1466 return client, waitClient, nil 1467} 1468 1469// newDockerClient creates a new *docker.Client with a configurable timeout 1470func (d *Driver) newDockerClient(timeout time.Duration) (*docker.Client, error) { 1471 var err error 1472 var merr multierror.Error 1473 var newClient *docker.Client 1474 1475 // Default to using whatever is configured in docker.endpoint. If this is 1476 // not specified we'll fall back on NewClientFromEnv which reads config from 1477 // the DOCKER_* environment variables DOCKER_HOST, DOCKER_TLS_VERIFY, and 1478 // DOCKER_CERT_PATH. This allows us to lock down the config in production 1479 // but also accept the standard ENV configs for dev and test. 1480 dockerEndpoint := d.config.Endpoint 1481 if dockerEndpoint != "" { 1482 cert := d.config.TLS.Cert 1483 key := d.config.TLS.Key 1484 ca := d.config.TLS.CA 1485 1486 if cert+key+ca != "" { 1487 d.logger.Debug("using TLS client connection", "endpoint", dockerEndpoint) 1488 newClient, err = docker.NewTLSClient(dockerEndpoint, cert, key, ca) 1489 if err != nil { 1490 merr.Errors = append(merr.Errors, err) 1491 } 1492 } else { 1493 d.logger.Debug("using standard client connection", "endpoint", dockerEndpoint) 1494 newClient, err = docker.NewClient(dockerEndpoint) 1495 if err != nil { 1496 merr.Errors = append(merr.Errors, err) 1497 } 1498 } 1499 } else { 1500 d.logger.Debug("using client connection initialized from environment") 1501 newClient, err = docker.NewClientFromEnv() 1502 if err != nil { 1503 merr.Errors = append(merr.Errors, err) 1504 } 1505 } 1506 1507 if timeout != 0 && newClient != nil { 1508 newClient.SetTimeout(timeout) 1509 } 1510 return newClient, merr.ErrorOrNil() 1511} 1512 1513func sliceMergeUlimit(ulimitsRaw map[string]string) ([]docker.ULimit, error) { 1514 var ulimits []docker.ULimit 1515 1516 for name, ulimitRaw := range ulimitsRaw { 1517 if len(ulimitRaw) == 0 { 1518 return []docker.ULimit{}, fmt.Errorf("Malformed ulimit specification %v: %q, cannot be empty", name, ulimitRaw) 1519 } 1520 // hard limit is optional 1521 if strings.Contains(ulimitRaw, ":") == false { 1522 ulimitRaw = ulimitRaw + ":" + ulimitRaw 1523 } 1524 1525 splitted := strings.SplitN(ulimitRaw, ":", 2) 1526 if len(splitted) < 2 { 1527 return []docker.ULimit{}, fmt.Errorf("Malformed ulimit specification %v: %v", name, ulimitRaw) 1528 } 1529 soft, err := strconv.Atoi(splitted[0]) 1530 if err != nil { 1531 return []docker.ULimit{}, fmt.Errorf("Malformed soft ulimit %v: %v", name, ulimitRaw) 1532 } 1533 hard, err := strconv.Atoi(splitted[1]) 1534 if err != nil { 1535 return []docker.ULimit{}, fmt.Errorf("Malformed hard ulimit %v: %v", name, ulimitRaw) 1536 } 1537 1538 ulimit := docker.ULimit{ 1539 Name: name, 1540 Soft: int64(soft), 1541 Hard: int64(hard), 1542 } 1543 ulimits = append(ulimits, ulimit) 1544 } 1545 return ulimits, nil 1546} 1547 1548func (d *Driver) Shutdown() { 1549 d.signalShutdown() 1550} 1551 1552func isDockerTransientError(err error) bool { 1553 if err == nil { 1554 return false 1555 } 1556 1557 errMsg := err.Error() 1558 for _, te := range dockerTransientErrs { 1559 if strings.Contains(errMsg, te) { 1560 return true 1561 } 1562 } 1563 1564 return false 1565} 1566