1package cluster // import "github.com/docker/docker/daemon/cluster" 2 3import ( 4 "context" 5 "fmt" 6 "net" 7 "strings" 8 "time" 9 10 apitypes "github.com/docker/docker/api/types" 11 "github.com/docker/docker/api/types/filters" 12 types "github.com/docker/docker/api/types/swarm" 13 "github.com/docker/docker/daemon/cluster/convert" 14 "github.com/docker/docker/errdefs" 15 "github.com/docker/docker/opts" 16 "github.com/docker/docker/pkg/signal" 17 swarmapi "github.com/docker/swarmkit/api" 18 "github.com/docker/swarmkit/manager/encryption" 19 swarmnode "github.com/docker/swarmkit/node" 20 "github.com/pkg/errors" 21 "github.com/sirupsen/logrus" 22) 23 24// Init initializes new cluster from user provided request. 25func (c *Cluster) Init(req types.InitRequest) (string, error) { 26 c.controlMutex.Lock() 27 defer c.controlMutex.Unlock() 28 if c.nr != nil { 29 if req.ForceNewCluster { 30 31 // Take c.mu temporarily to wait for presently running 32 // API handlers to finish before shutting down the node. 33 c.mu.Lock() 34 if !c.nr.nodeState.IsManager() { 35 return "", errSwarmNotManager 36 } 37 c.mu.Unlock() 38 39 if err := c.nr.Stop(); err != nil { 40 return "", err 41 } 42 } else { 43 return "", errSwarmExists 44 } 45 } 46 47 if err := validateAndSanitizeInitRequest(&req); err != nil { 48 return "", errdefs.InvalidParameter(err) 49 } 50 51 listenHost, listenPort, err := resolveListenAddr(req.ListenAddr) 52 if err != nil { 53 return "", err 54 } 55 56 advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort) 57 if err != nil { 58 return "", err 59 } 60 61 dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr) 62 if err != nil { 63 return "", err 64 } 65 66 localAddr := listenHost 67 68 // If the local address is undetermined, the advertise address 69 // will be used as local address, if it belongs to this system. 70 // If the advertise address is not local, then we try to find 71 // a system address to use as local address. If this fails, 72 // we give up and ask the user to pass the listen address. 73 if net.ParseIP(localAddr).IsUnspecified() { 74 advertiseIP := net.ParseIP(advertiseHost) 75 76 found := false 77 for _, systemIP := range listSystemIPs() { 78 if systemIP.Equal(advertiseIP) { 79 localAddr = advertiseIP.String() 80 found = true 81 break 82 } 83 } 84 85 if !found { 86 ip, err := c.resolveSystemAddr() 87 if err != nil { 88 logrus.Warnf("Could not find a local address: %v", err) 89 return "", errMustSpecifyListenAddr 90 } 91 localAddr = ip.String() 92 } 93 } 94 95 nr, err := c.newNodeRunner(nodeStartConfig{ 96 forceNewCluster: req.ForceNewCluster, 97 autolock: req.AutoLockManagers, 98 LocalAddr: localAddr, 99 ListenAddr: net.JoinHostPort(listenHost, listenPort), 100 AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort), 101 DataPathAddr: dataPathAddr, 102 DefaultAddressPool: req.DefaultAddrPool, 103 SubnetSize: req.SubnetSize, 104 availability: req.Availability, 105 }) 106 if err != nil { 107 return "", err 108 } 109 c.mu.Lock() 110 c.nr = nr 111 c.mu.Unlock() 112 113 if err := <-nr.Ready(); err != nil { 114 c.mu.Lock() 115 c.nr = nil 116 c.mu.Unlock() 117 if !req.ForceNewCluster { // if failure on first attempt don't keep state 118 if err := clearPersistentState(c.root); err != nil { 119 return "", err 120 } 121 } 122 return "", err 123 } 124 state := nr.State() 125 if state.swarmNode == nil { // should never happen but protect from panic 126 return "", errors.New("invalid cluster state for spec initialization") 127 } 128 if err := initClusterSpec(state.swarmNode, req.Spec); err != nil { 129 return "", err 130 } 131 return state.NodeID(), nil 132} 133 134// Join makes current Cluster part of an existing swarm cluster. 135func (c *Cluster) Join(req types.JoinRequest) error { 136 c.controlMutex.Lock() 137 defer c.controlMutex.Unlock() 138 c.mu.Lock() 139 if c.nr != nil { 140 c.mu.Unlock() 141 return errors.WithStack(errSwarmExists) 142 } 143 c.mu.Unlock() 144 145 if err := validateAndSanitizeJoinRequest(&req); err != nil { 146 return errdefs.InvalidParameter(err) 147 } 148 149 listenHost, listenPort, err := resolveListenAddr(req.ListenAddr) 150 if err != nil { 151 return err 152 } 153 154 var advertiseAddr string 155 if req.AdvertiseAddr != "" { 156 advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort) 157 // For joining, we don't need to provide an advertise address, 158 // since the remote side can detect it. 159 if err == nil { 160 advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort) 161 } 162 } 163 164 dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr) 165 if err != nil { 166 return err 167 } 168 169 nr, err := c.newNodeRunner(nodeStartConfig{ 170 RemoteAddr: req.RemoteAddrs[0], 171 ListenAddr: net.JoinHostPort(listenHost, listenPort), 172 AdvertiseAddr: advertiseAddr, 173 DataPathAddr: dataPathAddr, 174 joinAddr: req.RemoteAddrs[0], 175 joinToken: req.JoinToken, 176 availability: req.Availability, 177 }) 178 if err != nil { 179 return err 180 } 181 182 c.mu.Lock() 183 c.nr = nr 184 c.mu.Unlock() 185 186 select { 187 case <-time.After(swarmConnectTimeout): 188 return errSwarmJoinTimeoutReached 189 case err := <-nr.Ready(): 190 if err != nil { 191 c.mu.Lock() 192 c.nr = nil 193 c.mu.Unlock() 194 if err := clearPersistentState(c.root); err != nil { 195 return err 196 } 197 } 198 return err 199 } 200} 201 202// Inspect retrieves the configuration properties of a managed swarm cluster. 203func (c *Cluster) Inspect() (types.Swarm, error) { 204 var swarm types.Swarm 205 if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { 206 s, err := c.inspect(ctx, state) 207 if err != nil { 208 return err 209 } 210 swarm = s 211 return nil 212 }); err != nil { 213 return types.Swarm{}, err 214 } 215 return swarm, nil 216} 217 218func (c *Cluster) inspect(ctx context.Context, state nodeState) (types.Swarm, error) { 219 s, err := getSwarm(ctx, state.controlClient) 220 if err != nil { 221 return types.Swarm{}, err 222 } 223 return convert.SwarmFromGRPC(*s), nil 224} 225 226// Update updates configuration of a managed swarm cluster. 227func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error { 228 return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { 229 swarm, err := getSwarm(ctx, state.controlClient) 230 if err != nil { 231 return err 232 } 233 234 // Validate spec name. 235 if spec.Annotations.Name == "" { 236 spec.Annotations.Name = "default" 237 } else if spec.Annotations.Name != "default" { 238 return errdefs.InvalidParameter(errors.New(`swarm spec must be named "default"`)) 239 } 240 241 // In update, client should provide the complete spec of the swarm, including 242 // Name and Labels. If a field is specified with 0 or nil, then the default value 243 // will be used to swarmkit. 244 clusterSpec, err := convert.SwarmSpecToGRPC(spec) 245 if err != nil { 246 return errdefs.InvalidParameter(err) 247 } 248 249 _, err = state.controlClient.UpdateCluster( 250 ctx, 251 &swarmapi.UpdateClusterRequest{ 252 ClusterID: swarm.ID, 253 Spec: &clusterSpec, 254 ClusterVersion: &swarmapi.Version{ 255 Index: version, 256 }, 257 Rotation: swarmapi.KeyRotation{ 258 WorkerJoinToken: flags.RotateWorkerToken, 259 ManagerJoinToken: flags.RotateManagerToken, 260 ManagerUnlockKey: flags.RotateManagerUnlockKey, 261 }, 262 }, 263 ) 264 return err 265 }) 266} 267 268// GetUnlockKey returns the unlock key for the swarm. 269func (c *Cluster) GetUnlockKey() (string, error) { 270 var resp *swarmapi.GetUnlockKeyResponse 271 if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { 272 client := swarmapi.NewCAClient(state.grpcConn) 273 274 r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{}) 275 if err != nil { 276 return err 277 } 278 resp = r 279 return nil 280 }); err != nil { 281 return "", err 282 } 283 if len(resp.UnlockKey) == 0 { 284 // no key 285 return "", nil 286 } 287 return encryption.HumanReadableKey(resp.UnlockKey), nil 288} 289 290// UnlockSwarm provides a key to decrypt data that is encrypted at rest. 291func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error { 292 c.controlMutex.Lock() 293 defer c.controlMutex.Unlock() 294 295 c.mu.RLock() 296 state := c.currentNodeState() 297 298 if !state.IsActiveManager() { 299 // when manager is not active, 300 // unless it is locked, otherwise return error. 301 if err := c.errNoManager(state); err != errSwarmLocked { 302 c.mu.RUnlock() 303 return err 304 } 305 } else { 306 // when manager is active, return an error of "not locked" 307 c.mu.RUnlock() 308 return notLockedError{} 309 } 310 311 // only when swarm is locked, code running reaches here 312 nr := c.nr 313 c.mu.RUnlock() 314 315 key, err := encryption.ParseHumanReadableKey(req.UnlockKey) 316 if err != nil { 317 return errdefs.InvalidParameter(err) 318 } 319 320 config := nr.config 321 config.lockKey = key 322 if err := nr.Stop(); err != nil { 323 return err 324 } 325 nr, err = c.newNodeRunner(config) 326 if err != nil { 327 return err 328 } 329 330 c.mu.Lock() 331 c.nr = nr 332 c.mu.Unlock() 333 334 if err := <-nr.Ready(); err != nil { 335 if errors.Cause(err) == errSwarmLocked { 336 return invalidUnlockKey{} 337 } 338 return errors.Errorf("swarm component could not be started: %v", err) 339 } 340 return nil 341} 342 343// Leave shuts down Cluster and removes current state. 344func (c *Cluster) Leave(force bool) error { 345 c.controlMutex.Lock() 346 defer c.controlMutex.Unlock() 347 348 c.mu.Lock() 349 nr := c.nr 350 if nr == nil { 351 c.mu.Unlock() 352 return errors.WithStack(errNoSwarm) 353 } 354 355 state := c.currentNodeState() 356 357 c.mu.Unlock() 358 359 if errors.Cause(state.err) == errSwarmLocked && !force { 360 // leave a locked swarm without --force is not allowed 361 return errors.WithStack(notAvailableError("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message.")) 362 } 363 364 if state.IsManager() && !force { 365 msg := "You are attempting to leave the swarm on a node that is participating as a manager. " 366 if state.IsActiveManager() { 367 active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID()) 368 if err == nil { 369 if active && removingManagerCausesLossOfQuorum(reachable, unreachable) { 370 if isLastManager(reachable, unreachable) { 371 msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. " 372 return errors.WithStack(notAvailableError(msg)) 373 } 374 msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable) 375 } 376 } 377 } else { 378 msg += "Doing so may lose the consensus of your cluster. " 379 } 380 381 msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message." 382 return errors.WithStack(notAvailableError(msg)) 383 } 384 // release readers in here 385 if err := nr.Stop(); err != nil { 386 logrus.Errorf("failed to shut down cluster node: %v", err) 387 signal.DumpStacks("") 388 return err 389 } 390 391 c.mu.Lock() 392 c.nr = nil 393 c.mu.Unlock() 394 395 if nodeID := state.NodeID(); nodeID != "" { 396 nodeContainers, err := c.listContainerForNode(nodeID) 397 if err != nil { 398 return err 399 } 400 for _, id := range nodeContainers { 401 if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil { 402 logrus.Errorf("error removing %v: %v", id, err) 403 } 404 } 405 } 406 407 // todo: cleanup optional? 408 if err := clearPersistentState(c.root); err != nil { 409 return err 410 } 411 c.config.Backend.DaemonLeavesCluster() 412 return nil 413} 414 415// Info returns information about the current cluster state. 416func (c *Cluster) Info() types.Info { 417 info := types.Info{ 418 NodeAddr: c.GetAdvertiseAddress(), 419 } 420 c.mu.RLock() 421 defer c.mu.RUnlock() 422 423 state := c.currentNodeState() 424 info.LocalNodeState = state.status 425 if state.err != nil { 426 info.Error = state.err.Error() 427 } 428 429 ctx, cancel := c.getRequestContext() 430 defer cancel() 431 432 if state.IsActiveManager() { 433 info.ControlAvailable = true 434 swarm, err := c.inspect(ctx, state) 435 if err != nil { 436 info.Error = err.Error() 437 } 438 439 info.Cluster = &swarm.ClusterInfo 440 441 if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err != nil { 442 info.Error = err.Error() 443 } else { 444 info.Nodes = len(r.Nodes) 445 for _, n := range r.Nodes { 446 if n.ManagerStatus != nil { 447 info.Managers = info.Managers + 1 448 } 449 } 450 } 451 } 452 453 if state.swarmNode != nil { 454 for _, r := range state.swarmNode.Remotes() { 455 info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr}) 456 } 457 info.NodeID = state.swarmNode.NodeID() 458 } 459 460 return info 461} 462 463func validateAndSanitizeInitRequest(req *types.InitRequest) error { 464 var err error 465 req.ListenAddr, err = validateAddr(req.ListenAddr) 466 if err != nil { 467 return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err) 468 } 469 470 if req.Spec.Annotations.Name == "" { 471 req.Spec.Annotations.Name = "default" 472 } else if req.Spec.Annotations.Name != "default" { 473 return errors.New(`swarm spec must be named "default"`) 474 } 475 476 return nil 477} 478 479func validateAndSanitizeJoinRequest(req *types.JoinRequest) error { 480 var err error 481 req.ListenAddr, err = validateAddr(req.ListenAddr) 482 if err != nil { 483 return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err) 484 } 485 if len(req.RemoteAddrs) == 0 { 486 return errors.New("at least 1 RemoteAddr is required to join") 487 } 488 for i := range req.RemoteAddrs { 489 req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i]) 490 if err != nil { 491 return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err) 492 } 493 } 494 return nil 495} 496 497func validateAddr(addr string) (string, error) { 498 if addr == "" { 499 return addr, errors.New("invalid empty address") 500 } 501 newaddr, err := opts.ParseTCPAddr(addr, defaultAddr) 502 if err != nil { 503 return addr, nil 504 } 505 return strings.TrimPrefix(newaddr, "tcp://"), nil 506} 507 508func initClusterSpec(node *swarmnode.Node, spec types.Spec) error { 509 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 510 defer cancel() 511 for conn := range node.ListenControlSocket(ctx) { 512 if ctx.Err() != nil { 513 return ctx.Err() 514 } 515 if conn != nil { 516 client := swarmapi.NewControlClient(conn) 517 var cluster *swarmapi.Cluster 518 for i := 0; ; i++ { 519 lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{}) 520 if err != nil { 521 return fmt.Errorf("error on listing clusters: %v", err) 522 } 523 if len(lcr.Clusters) == 0 { 524 if i < 10 { 525 time.Sleep(200 * time.Millisecond) 526 continue 527 } 528 return errors.New("empty list of clusters was returned") 529 } 530 cluster = lcr.Clusters[0] 531 break 532 } 533 // In init, we take the initial default values from swarmkit, and merge 534 // any non nil or 0 value from spec to GRPC spec. This will leave the 535 // default value alone. 536 // Note that this is different from Update(), as in Update() we expect 537 // user to specify the complete spec of the cluster (as they already know 538 // the existing one and knows which field to update) 539 clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec) 540 if err != nil { 541 return fmt.Errorf("error updating cluster settings: %v", err) 542 } 543 _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{ 544 ClusterID: cluster.ID, 545 ClusterVersion: &cluster.Meta.Version, 546 Spec: &clusterSpec, 547 }) 548 if err != nil { 549 return fmt.Errorf("error updating cluster settings: %v", err) 550 } 551 return nil 552 } 553 } 554 return ctx.Err() 555} 556 557func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) { 558 var ids []string 559 filters := filters.NewArgs() 560 filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID)) 561 containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{ 562 Filters: filters, 563 }) 564 if err != nil { 565 return []string{}, err 566 } 567 for _, c := range containers { 568 ids = append(ids, c.ID) 569 } 570 return ids, nil 571} 572