1package cluster // import "github.com/docker/docker/daemon/cluster" 2 3import ( 4 "context" 5 "fmt" 6 "net" 7 "strings" 8 "time" 9 10 apitypes "github.com/docker/docker/api/types" 11 "github.com/docker/docker/api/types/filters" 12 types "github.com/docker/docker/api/types/swarm" 13 "github.com/docker/docker/daemon/cluster/convert" 14 "github.com/docker/docker/errdefs" 15 "github.com/docker/docker/opts" 16 "github.com/docker/docker/pkg/signal" 17 swarmapi "github.com/docker/swarmkit/api" 18 "github.com/docker/swarmkit/manager/encryption" 19 swarmnode "github.com/docker/swarmkit/node" 20 "github.com/pkg/errors" 21 "github.com/sirupsen/logrus" 22 "google.golang.org/grpc" 23) 24 25// Init initializes new cluster from user provided request. 26func (c *Cluster) Init(req types.InitRequest) (string, error) { 27 c.controlMutex.Lock() 28 defer c.controlMutex.Unlock() 29 if c.nr != nil { 30 if req.ForceNewCluster { 31 32 // Take c.mu temporarily to wait for presently running 33 // API handlers to finish before shutting down the node. 34 c.mu.Lock() 35 if !c.nr.nodeState.IsManager() { 36 return "", errSwarmNotManager 37 } 38 c.mu.Unlock() 39 40 if err := c.nr.Stop(); err != nil { 41 return "", err 42 } 43 } else { 44 return "", errSwarmExists 45 } 46 } 47 48 if err := validateAndSanitizeInitRequest(&req); err != nil { 49 return "", errdefs.InvalidParameter(err) 50 } 51 52 listenHost, listenPort, err := resolveListenAddr(req.ListenAddr) 53 if err != nil { 54 return "", err 55 } 56 57 advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort) 58 if err != nil { 59 return "", err 60 } 61 62 dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr) 63 if err != nil { 64 return "", err 65 } 66 67 localAddr := listenHost 68 69 // If the local address is undetermined, the advertise address 70 // will be used as local address, if it belongs to this system. 71 // If the advertise address is not local, then we try to find 72 // a system address to use as local address. If this fails, 73 // we give up and ask the user to pass the listen address. 74 if net.ParseIP(localAddr).IsUnspecified() { 75 advertiseIP := net.ParseIP(advertiseHost) 76 77 found := false 78 for _, systemIP := range listSystemIPs() { 79 if systemIP.Equal(advertiseIP) { 80 localAddr = advertiseIP.String() 81 found = true 82 break 83 } 84 } 85 86 if !found { 87 ip, err := c.resolveSystemAddr() 88 if err != nil { 89 logrus.Warnf("Could not find a local address: %v", err) 90 return "", errMustSpecifyListenAddr 91 } 92 localAddr = ip.String() 93 } 94 } 95 96 //Validate Default Address Pool input 97 if err := validateDefaultAddrPool(req.DefaultAddrPool, req.SubnetSize); err != nil { 98 return "", err 99 } 100 101 port, err := getDataPathPort(req.DataPathPort) 102 if err != nil { 103 return "", err 104 } 105 106 nr, err := c.newNodeRunner(nodeStartConfig{ 107 forceNewCluster: req.ForceNewCluster, 108 autolock: req.AutoLockManagers, 109 LocalAddr: localAddr, 110 ListenAddr: net.JoinHostPort(listenHost, listenPort), 111 AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort), 112 DataPathAddr: dataPathAddr, 113 DefaultAddressPool: req.DefaultAddrPool, 114 SubnetSize: req.SubnetSize, 115 availability: req.Availability, 116 DataPathPort: port, 117 }) 118 if err != nil { 119 return "", err 120 } 121 c.mu.Lock() 122 c.nr = nr 123 c.mu.Unlock() 124 125 if err := <-nr.Ready(); err != nil { 126 c.mu.Lock() 127 c.nr = nil 128 c.mu.Unlock() 129 if !req.ForceNewCluster { // if failure on first attempt don't keep state 130 if err := clearPersistentState(c.root); err != nil { 131 return "", err 132 } 133 } 134 return "", err 135 } 136 state := nr.State() 137 if state.swarmNode == nil { // should never happen but protect from panic 138 return "", errors.New("invalid cluster state for spec initialization") 139 } 140 if err := initClusterSpec(state.swarmNode, req.Spec); err != nil { 141 return "", err 142 } 143 return state.NodeID(), nil 144} 145 146// Join makes current Cluster part of an existing swarm cluster. 147func (c *Cluster) Join(req types.JoinRequest) error { 148 c.controlMutex.Lock() 149 defer c.controlMutex.Unlock() 150 c.mu.Lock() 151 if c.nr != nil { 152 c.mu.Unlock() 153 return errors.WithStack(errSwarmExists) 154 } 155 c.mu.Unlock() 156 157 if err := validateAndSanitizeJoinRequest(&req); err != nil { 158 return errdefs.InvalidParameter(err) 159 } 160 161 listenHost, listenPort, err := resolveListenAddr(req.ListenAddr) 162 if err != nil { 163 return err 164 } 165 166 var advertiseAddr string 167 if req.AdvertiseAddr != "" { 168 advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort) 169 // For joining, we don't need to provide an advertise address, 170 // since the remote side can detect it. 171 if err == nil { 172 advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort) 173 } 174 } 175 176 dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr) 177 if err != nil { 178 return err 179 } 180 181 nr, err := c.newNodeRunner(nodeStartConfig{ 182 RemoteAddr: req.RemoteAddrs[0], 183 ListenAddr: net.JoinHostPort(listenHost, listenPort), 184 AdvertiseAddr: advertiseAddr, 185 DataPathAddr: dataPathAddr, 186 joinAddr: req.RemoteAddrs[0], 187 joinToken: req.JoinToken, 188 availability: req.Availability, 189 }) 190 if err != nil { 191 return err 192 } 193 194 c.mu.Lock() 195 c.nr = nr 196 c.mu.Unlock() 197 198 select { 199 case <-time.After(swarmConnectTimeout): 200 return errSwarmJoinTimeoutReached 201 case err := <-nr.Ready(): 202 if err != nil { 203 c.mu.Lock() 204 c.nr = nil 205 c.mu.Unlock() 206 if err := clearPersistentState(c.root); err != nil { 207 return err 208 } 209 } 210 return err 211 } 212} 213 214// Inspect retrieves the configuration properties of a managed swarm cluster. 215func (c *Cluster) Inspect() (types.Swarm, error) { 216 var swarm types.Swarm 217 if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { 218 s, err := c.inspect(ctx, state) 219 if err != nil { 220 return err 221 } 222 swarm = s 223 return nil 224 }); err != nil { 225 return types.Swarm{}, err 226 } 227 return swarm, nil 228} 229 230func (c *Cluster) inspect(ctx context.Context, state nodeState) (types.Swarm, error) { 231 s, err := getSwarm(ctx, state.controlClient) 232 if err != nil { 233 return types.Swarm{}, err 234 } 235 return convert.SwarmFromGRPC(*s), nil 236} 237 238// Update updates configuration of a managed swarm cluster. 239func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error { 240 return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { 241 swarm, err := getSwarm(ctx, state.controlClient) 242 if err != nil { 243 return err 244 } 245 246 // Validate spec name. 247 if spec.Annotations.Name == "" { 248 spec.Annotations.Name = "default" 249 } else if spec.Annotations.Name != "default" { 250 return errdefs.InvalidParameter(errors.New(`swarm spec must be named "default"`)) 251 } 252 253 // In update, client should provide the complete spec of the swarm, including 254 // Name and Labels. If a field is specified with 0 or nil, then the default value 255 // will be used to swarmkit. 256 clusterSpec, err := convert.SwarmSpecToGRPC(spec) 257 if err != nil { 258 return errdefs.InvalidParameter(err) 259 } 260 261 _, err = state.controlClient.UpdateCluster( 262 ctx, 263 &swarmapi.UpdateClusterRequest{ 264 ClusterID: swarm.ID, 265 Spec: &clusterSpec, 266 ClusterVersion: &swarmapi.Version{ 267 Index: version, 268 }, 269 Rotation: swarmapi.KeyRotation{ 270 WorkerJoinToken: flags.RotateWorkerToken, 271 ManagerJoinToken: flags.RotateManagerToken, 272 ManagerUnlockKey: flags.RotateManagerUnlockKey, 273 }, 274 }, 275 ) 276 return err 277 }) 278} 279 280// GetUnlockKey returns the unlock key for the swarm. 281func (c *Cluster) GetUnlockKey() (string, error) { 282 var resp *swarmapi.GetUnlockKeyResponse 283 if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { 284 client := swarmapi.NewCAClient(state.grpcConn) 285 286 r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{}) 287 if err != nil { 288 return err 289 } 290 resp = r 291 return nil 292 }); err != nil { 293 return "", err 294 } 295 if len(resp.UnlockKey) == 0 { 296 // no key 297 return "", nil 298 } 299 return encryption.HumanReadableKey(resp.UnlockKey), nil 300} 301 302// UnlockSwarm provides a key to decrypt data that is encrypted at rest. 303func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error { 304 c.controlMutex.Lock() 305 defer c.controlMutex.Unlock() 306 307 c.mu.RLock() 308 state := c.currentNodeState() 309 310 if !state.IsActiveManager() { 311 // when manager is not active, 312 // unless it is locked, otherwise return error. 313 if err := c.errNoManager(state); err != errSwarmLocked { 314 c.mu.RUnlock() 315 return err 316 } 317 } else { 318 // when manager is active, return an error of "not locked" 319 c.mu.RUnlock() 320 return notLockedError{} 321 } 322 323 // only when swarm is locked, code running reaches here 324 nr := c.nr 325 c.mu.RUnlock() 326 327 key, err := encryption.ParseHumanReadableKey(req.UnlockKey) 328 if err != nil { 329 return errdefs.InvalidParameter(err) 330 } 331 332 config := nr.config 333 config.lockKey = key 334 if err := nr.Stop(); err != nil { 335 return err 336 } 337 nr, err = c.newNodeRunner(config) 338 if err != nil { 339 return err 340 } 341 342 c.mu.Lock() 343 c.nr = nr 344 c.mu.Unlock() 345 346 if err := <-nr.Ready(); err != nil { 347 if errors.Cause(err) == errSwarmLocked { 348 return invalidUnlockKey{} 349 } 350 return errors.Errorf("swarm component could not be started: %v", err) 351 } 352 return nil 353} 354 355// Leave shuts down Cluster and removes current state. 356func (c *Cluster) Leave(force bool) error { 357 c.controlMutex.Lock() 358 defer c.controlMutex.Unlock() 359 360 c.mu.Lock() 361 nr := c.nr 362 if nr == nil { 363 c.mu.Unlock() 364 return errors.WithStack(errNoSwarm) 365 } 366 367 state := c.currentNodeState() 368 369 c.mu.Unlock() 370 371 if errors.Cause(state.err) == errSwarmLocked && !force { 372 // leave a locked swarm without --force is not allowed 373 return errors.WithStack(notAvailableError("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message.")) 374 } 375 376 if state.IsManager() && !force { 377 msg := "You are attempting to leave the swarm on a node that is participating as a manager. " 378 if state.IsActiveManager() { 379 active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID()) 380 if err == nil { 381 if active && removingManagerCausesLossOfQuorum(reachable, unreachable) { 382 if isLastManager(reachable, unreachable) { 383 msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. " 384 return errors.WithStack(notAvailableError(msg)) 385 } 386 msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable) 387 } 388 } 389 } else { 390 msg += "Doing so may lose the consensus of your cluster. " 391 } 392 393 msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message." 394 return errors.WithStack(notAvailableError(msg)) 395 } 396 // release readers in here 397 if err := nr.Stop(); err != nil { 398 logrus.Errorf("failed to shut down cluster node: %v", err) 399 signal.DumpStacks("") 400 return err 401 } 402 403 c.mu.Lock() 404 c.nr = nil 405 c.mu.Unlock() 406 407 if nodeID := state.NodeID(); nodeID != "" { 408 nodeContainers, err := c.listContainerForNode(nodeID) 409 if err != nil { 410 return err 411 } 412 for _, id := range nodeContainers { 413 if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil { 414 logrus.Errorf("error removing %v: %v", id, err) 415 } 416 } 417 } 418 419 // todo: cleanup optional? 420 if err := clearPersistentState(c.root); err != nil { 421 return err 422 } 423 c.config.Backend.DaemonLeavesCluster() 424 return nil 425} 426 427// Info returns information about the current cluster state. 428func (c *Cluster) Info() types.Info { 429 info := types.Info{ 430 NodeAddr: c.GetAdvertiseAddress(), 431 } 432 c.mu.RLock() 433 defer c.mu.RUnlock() 434 435 state := c.currentNodeState() 436 info.LocalNodeState = state.status 437 if state.err != nil { 438 info.Error = state.err.Error() 439 } 440 441 ctx, cancel := c.getRequestContext() 442 defer cancel() 443 444 if state.IsActiveManager() { 445 info.ControlAvailable = true 446 swarm, err := c.inspect(ctx, state) 447 if err != nil { 448 info.Error = err.Error() 449 } 450 451 info.Cluster = &swarm.ClusterInfo 452 453 if r, err := state.controlClient.ListNodes( 454 ctx, &swarmapi.ListNodesRequest{}, 455 grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse), 456 ); err != nil { 457 info.Error = err.Error() 458 } else { 459 info.Nodes = len(r.Nodes) 460 for _, n := range r.Nodes { 461 if n.ManagerStatus != nil { 462 info.Managers = info.Managers + 1 463 } 464 } 465 } 466 467 switch info.LocalNodeState { 468 case types.LocalNodeStateInactive, types.LocalNodeStateLocked, types.LocalNodeStateError: 469 // nothing to do 470 default: 471 if info.Managers == 2 { 472 const warn string = `WARNING: Running Swarm in a two-manager configuration. This configuration provides 473 no fault tolerance, and poses a high risk to lose control over the cluster. 474 Refer to https://docs.docker.com/engine/swarm/admin_guide/ to configure the 475 Swarm for fault-tolerance.` 476 477 info.Warnings = append(info.Warnings, warn) 478 } 479 } 480 } 481 482 if state.swarmNode != nil { 483 for _, r := range state.swarmNode.Remotes() { 484 info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr}) 485 } 486 info.NodeID = state.swarmNode.NodeID() 487 } 488 489 return info 490} 491 492func validateAndSanitizeInitRequest(req *types.InitRequest) error { 493 var err error 494 req.ListenAddr, err = validateAddr(req.ListenAddr) 495 if err != nil { 496 return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err) 497 } 498 499 if req.Spec.Annotations.Name == "" { 500 req.Spec.Annotations.Name = "default" 501 } else if req.Spec.Annotations.Name != "default" { 502 return errors.New(`swarm spec must be named "default"`) 503 } 504 505 return nil 506} 507 508func validateAndSanitizeJoinRequest(req *types.JoinRequest) error { 509 var err error 510 req.ListenAddr, err = validateAddr(req.ListenAddr) 511 if err != nil { 512 return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err) 513 } 514 if len(req.RemoteAddrs) == 0 { 515 return errors.New("at least 1 RemoteAddr is required to join") 516 } 517 for i := range req.RemoteAddrs { 518 req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i]) 519 if err != nil { 520 return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err) 521 } 522 } 523 return nil 524} 525 526func validateAddr(addr string) (string, error) { 527 if addr == "" { 528 return addr, errors.New("invalid empty address") 529 } 530 newaddr, err := opts.ParseTCPAddr(addr, defaultAddr) 531 if err != nil { 532 return addr, nil 533 } 534 return strings.TrimPrefix(newaddr, "tcp://"), nil 535} 536 537func initClusterSpec(node *swarmnode.Node, spec types.Spec) error { 538 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 539 defer cancel() 540 for conn := range node.ListenControlSocket(ctx) { 541 if ctx.Err() != nil { 542 return ctx.Err() 543 } 544 if conn != nil { 545 client := swarmapi.NewControlClient(conn) 546 var cluster *swarmapi.Cluster 547 for i := 0; ; i++ { 548 lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{}) 549 if err != nil { 550 return fmt.Errorf("error on listing clusters: %v", err) 551 } 552 if len(lcr.Clusters) == 0 { 553 if i < 10 { 554 time.Sleep(200 * time.Millisecond) 555 continue 556 } 557 return errors.New("empty list of clusters was returned") 558 } 559 cluster = lcr.Clusters[0] 560 break 561 } 562 // In init, we take the initial default values from swarmkit, and merge 563 // any non nil or 0 value from spec to GRPC spec. This will leave the 564 // default value alone. 565 // Note that this is different from Update(), as in Update() we expect 566 // user to specify the complete spec of the cluster (as they already know 567 // the existing one and knows which field to update) 568 clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec) 569 if err != nil { 570 return fmt.Errorf("error updating cluster settings: %v", err) 571 } 572 _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{ 573 ClusterID: cluster.ID, 574 ClusterVersion: &cluster.Meta.Version, 575 Spec: &clusterSpec, 576 }) 577 if err != nil { 578 return fmt.Errorf("error updating cluster settings: %v", err) 579 } 580 return nil 581 } 582 } 583 return ctx.Err() 584} 585 586func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) { 587 var ids []string 588 filters := filters.NewArgs() 589 filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID)) 590 containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{ 591 Filters: filters, 592 }) 593 if err != nil { 594 return []string{}, err 595 } 596 for _, c := range containers { 597 ids = append(ids, c.ID) 598 } 599 return ids, nil 600} 601