1package container 2 3import ( 4 "encoding/base64" 5 "encoding/json" 6 "fmt" 7 "io" 8 "strings" 9 "syscall" 10 "time" 11 12 "github.com/sirupsen/logrus" 13 "github.com/docker/distribution/digest" 14 "github.com/docker/docker/api/types" 15 "github.com/docker/docker/api/types/backend" 16 containertypes "github.com/docker/docker/api/types/container" 17 "github.com/docker/docker/api/types/events" 18 "github.com/docker/docker/daemon/cluster/convert" 19 executorpkg "github.com/docker/docker/daemon/cluster/executor" 20 "github.com/docker/docker/reference" 21 "github.com/docker/libnetwork" 22 "github.com/docker/swarmkit/agent/exec" 23 "github.com/docker/swarmkit/api" 24 "github.com/docker/swarmkit/log" 25 "github.com/docker/swarmkit/protobuf/ptypes" 26 "golang.org/x/net/context" 27 "golang.org/x/time/rate" 28) 29 30// containerAdapter conducts remote operations for a container. All calls 31// are mostly naked calls to the client API, seeded with information from 32// containerConfig. 33type containerAdapter struct { 34 backend executorpkg.Backend 35 container *containerConfig 36 secrets exec.SecretGetter 37} 38 39func newContainerAdapter(b executorpkg.Backend, task *api.Task, secrets exec.SecretGetter) (*containerAdapter, error) { 40 ctnr, err := newContainerConfig(task) 41 if err != nil { 42 return nil, err 43 } 44 45 return &containerAdapter{ 46 container: ctnr, 47 backend: b, 48 secrets: secrets, 49 }, nil 50} 51 52func (c *containerAdapter) pullImage(ctx context.Context) error { 53 spec := c.container.spec() 54 55 // Skip pulling if the image is referenced by image ID. 56 if _, err := digest.ParseDigest(spec.Image); err == nil { 57 return nil 58 } 59 60 // Skip pulling if the image is referenced by digest and already 61 // exists locally. 62 named, err := reference.ParseNamed(spec.Image) 63 if err == nil { 64 if _, ok := named.(reference.Canonical); ok { 65 _, err := c.backend.LookupImage(spec.Image) 66 if err == nil { 67 return nil 68 } 69 } 70 } 71 72 // if the image needs to be pulled, the auth config will be retrieved and updated 73 var encodedAuthConfig string 74 if spec.PullOptions != nil { 75 encodedAuthConfig = spec.PullOptions.RegistryAuth 76 } 77 78 authConfig := &types.AuthConfig{} 79 if encodedAuthConfig != "" { 80 if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil { 81 logrus.Warnf("invalid authconfig: %v", err) 82 } 83 } 84 85 pr, pw := io.Pipe() 86 metaHeaders := map[string][]string{} 87 go func() { 88 err := c.backend.PullImage(ctx, c.container.image(), "", metaHeaders, authConfig, pw) 89 pw.CloseWithError(err) 90 }() 91 92 dec := json.NewDecoder(pr) 93 dec.UseNumber() 94 m := map[string]interface{}{} 95 spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1) 96 97 lastStatus := "" 98 for { 99 if err := dec.Decode(&m); err != nil { 100 if err == io.EOF { 101 break 102 } 103 return err 104 } 105 l := log.G(ctx) 106 // limit pull progress logs unless the status changes 107 if spamLimiter.Allow() || lastStatus != m["status"] { 108 // if we have progress details, we have everything we need 109 if progress, ok := m["progressDetail"].(map[string]interface{}); ok { 110 // first, log the image and status 111 l = l.WithFields(logrus.Fields{ 112 "image": c.container.image(), 113 "status": m["status"], 114 }) 115 // then, if we have progress, log the progress 116 if progress["current"] != nil && progress["total"] != nil { 117 l = l.WithFields(logrus.Fields{ 118 "current": progress["current"], 119 "total": progress["total"], 120 }) 121 } 122 } 123 l.Debug("pull in progress") 124 } 125 // sometimes, we get no useful information at all, and add no fields 126 if status, ok := m["status"].(string); ok { 127 lastStatus = status 128 } 129 } 130 131 // if the final stream object contained an error, return it 132 if errMsg, ok := m["error"]; ok { 133 return fmt.Errorf("%v", errMsg) 134 } 135 return nil 136} 137 138func (c *containerAdapter) createNetworks(ctx context.Context) error { 139 for _, network := range c.container.networks() { 140 ncr, err := c.container.networkCreateRequest(network) 141 if err != nil { 142 return err 143 } 144 145 if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing 146 if _, ok := err.(libnetwork.NetworkNameError); ok { 147 continue 148 } 149 150 return err 151 } 152 } 153 154 return nil 155} 156 157func (c *containerAdapter) removeNetworks(ctx context.Context) error { 158 for _, nid := range c.container.networks() { 159 if err := c.backend.DeleteManagedNetwork(nid); err != nil { 160 switch err.(type) { 161 case *libnetwork.ActiveEndpointsError: 162 continue 163 case libnetwork.ErrNoSuchNetwork: 164 continue 165 default: 166 log.G(ctx).Errorf("network %s remove failed: %v", nid, err) 167 return err 168 } 169 } 170 } 171 172 return nil 173} 174 175func (c *containerAdapter) networkAttach(ctx context.Context) error { 176 config := c.container.createNetworkingConfig() 177 178 var ( 179 networkName string 180 networkID string 181 ) 182 183 if config != nil { 184 for n, epConfig := range config.EndpointsConfig { 185 networkName = n 186 networkID = epConfig.NetworkID 187 break 188 } 189 } 190 191 return c.backend.UpdateAttachment(networkName, networkID, c.container.id(), config) 192} 193 194func (c *containerAdapter) waitForDetach(ctx context.Context) error { 195 config := c.container.createNetworkingConfig() 196 197 var ( 198 networkName string 199 networkID string 200 ) 201 202 if config != nil { 203 for n, epConfig := range config.EndpointsConfig { 204 networkName = n 205 networkID = epConfig.NetworkID 206 break 207 } 208 } 209 210 return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.id()) 211} 212 213func (c *containerAdapter) create(ctx context.Context) error { 214 var cr containertypes.ContainerCreateCreatedBody 215 var err error 216 217 if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{ 218 Name: c.container.name(), 219 Config: c.container.config(), 220 HostConfig: c.container.hostConfig(), 221 // Use the first network in container create 222 NetworkingConfig: c.container.createNetworkingConfig(), 223 }); err != nil { 224 return err 225 } 226 227 // Docker daemon currently doesn't support multiple networks in container create 228 // Connect to all other networks 229 nc := c.container.connectNetworkingConfig() 230 231 if nc != nil { 232 for n, ep := range nc.EndpointsConfig { 233 if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil { 234 return err 235 } 236 } 237 } 238 239 container := c.container.task.Spec.GetContainer() 240 if container == nil { 241 return fmt.Errorf("unable to get container from task spec") 242 } 243 244 // configure secrets 245 if err := c.backend.SetContainerSecretStore(cr.ID, c.secrets); err != nil { 246 return err 247 } 248 249 refs := convert.SecretReferencesFromGRPC(container.Secrets) 250 if err := c.backend.SetContainerSecretReferences(cr.ID, refs); err != nil { 251 return err 252 } 253 254 if err := c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil { 255 return err 256 } 257 258 return nil 259} 260 261func (c *containerAdapter) start(ctx context.Context) error { 262 return c.backend.ContainerStart(c.container.name(), nil, "", "") 263} 264 265func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) { 266 cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false) 267 if ctx.Err() != nil { 268 return types.ContainerJSON{}, ctx.Err() 269 } 270 if err != nil { 271 return types.ContainerJSON{}, err 272 } 273 return *cs, nil 274} 275 276// events issues a call to the events API and returns a channel with all 277// events. The stream of events can be shutdown by cancelling the context. 278func (c *containerAdapter) events(ctx context.Context) <-chan events.Message { 279 log.G(ctx).Debugf("waiting on events") 280 buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter()) 281 eventsq := make(chan events.Message, len(buffer)) 282 283 for _, event := range buffer { 284 eventsq <- event 285 } 286 287 go func() { 288 defer c.backend.UnsubscribeFromEvents(l) 289 290 for { 291 select { 292 case ev := <-l: 293 jev, ok := ev.(events.Message) 294 if !ok { 295 log.G(ctx).Warnf("unexpected event message: %q", ev) 296 continue 297 } 298 select { 299 case eventsq <- jev: 300 case <-ctx.Done(): 301 return 302 } 303 case <-ctx.Done(): 304 return 305 } 306 } 307 }() 308 309 return eventsq 310} 311 312func (c *containerAdapter) wait(ctx context.Context) error { 313 return c.backend.ContainerWaitWithContext(ctx, c.container.nameOrID()) 314} 315 316func (c *containerAdapter) shutdown(ctx context.Context) error { 317 // Default stop grace period to nil (daemon will use the stopTimeout of the container) 318 var stopgrace *int 319 spec := c.container.spec() 320 if spec.StopGracePeriod != nil { 321 stopgraceValue := int(spec.StopGracePeriod.Seconds) 322 stopgrace = &stopgraceValue 323 } 324 return c.backend.ContainerStop(c.container.name(), stopgrace) 325} 326 327func (c *containerAdapter) terminate(ctx context.Context) error { 328 return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL)) 329} 330 331func (c *containerAdapter) remove(ctx context.Context) error { 332 return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{ 333 RemoveVolume: true, 334 ForceRemove: true, 335 }) 336} 337 338func (c *containerAdapter) createVolumes(ctx context.Context) error { 339 // Create plugin volumes that are embedded inside a Mount 340 for _, mount := range c.container.task.Spec.GetContainer().Mounts { 341 if mount.Type != api.MountTypeVolume { 342 continue 343 } 344 345 if mount.VolumeOptions == nil { 346 continue 347 } 348 349 if mount.VolumeOptions.DriverConfig == nil { 350 continue 351 } 352 353 req := c.container.volumeCreateRequest(&mount) 354 355 // Check if this volume exists on the engine 356 if _, err := c.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil { 357 // TODO(amitshukla): Today, volume create through the engine api does not return an error 358 // when the named volume with the same parameters already exists. 359 // It returns an error if the driver name is different - that is a valid error 360 return err 361 } 362 363 } 364 365 return nil 366} 367 368func (c *containerAdapter) activateServiceBinding() error { 369 return c.backend.ActivateContainerServiceBinding(c.container.name()) 370} 371 372func (c *containerAdapter) deactivateServiceBinding() error { 373 return c.backend.DeactivateContainerServiceBinding(c.container.name()) 374} 375 376func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) { 377 reader, writer := io.Pipe() 378 379 apiOptions := &backend.ContainerLogsConfig{ 380 ContainerLogsOptions: types.ContainerLogsOptions{ 381 Follow: options.Follow, 382 383 // TODO(stevvooe): Parse timestamp out of message. This 384 // absolutely needs to be done before going to production with 385 // this, at it is completely redundant. 386 Timestamps: true, 387 Details: false, // no clue what to do with this, let's just deprecate it. 388 }, 389 OutStream: writer, 390 } 391 392 if options.Since != nil { 393 since, err := ptypes.Timestamp(options.Since) 394 if err != nil { 395 return nil, err 396 } 397 apiOptions.Since = since.Format(time.RFC3339Nano) 398 } 399 400 if options.Tail < 0 { 401 // See protobuf documentation for details of how this works. 402 apiOptions.Tail = fmt.Sprint(-options.Tail - 1) 403 } else if options.Tail > 0 { 404 return nil, fmt.Errorf("tail relative to start of logs not supported via docker API") 405 } 406 407 if len(options.Streams) == 0 { 408 // empty == all 409 apiOptions.ShowStdout, apiOptions.ShowStderr = true, true 410 } else { 411 for _, stream := range options.Streams { 412 switch stream { 413 case api.LogStreamStdout: 414 apiOptions.ShowStdout = true 415 case api.LogStreamStderr: 416 apiOptions.ShowStderr = true 417 } 418 } 419 } 420 421 chStarted := make(chan struct{}) 422 go func() { 423 defer writer.Close() 424 c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted) 425 }() 426 427 return reader, nil 428} 429 430// todo: typed/wrapped errors 431func isContainerCreateNameConflict(err error) bool { 432 return strings.Contains(err.Error(), "Conflict. The name") 433} 434 435func isUnknownContainer(err error) bool { 436 return strings.Contains(err.Error(), "No such container:") 437} 438 439func isStoppedContainer(err error) bool { 440 return strings.Contains(err.Error(), "is already stopped") 441} 442