1package cluster // import "github.com/docker/docker/daemon/cluster" 2 3// 4// ## Swarmkit integration 5// 6// Cluster - static configurable object for accessing everything swarm related. 7// Contains methods for connecting and controlling the cluster. Exists always, 8// even if swarm mode is not enabled. 9// 10// NodeRunner - Manager for starting the swarmkit node. Is present only and 11// always if swarm mode is enabled. Implements backoff restart loop in case of 12// errors. 13// 14// NodeState - Information about the current node status including access to 15// gRPC clients if a manager is active. 16// 17// ### Locking 18// 19// `cluster.controlMutex` - taken for the whole lifecycle of the processes that 20// can reconfigure cluster(init/join/leave etc). Protects that one 21// reconfiguration action has fully completed before another can start. 22// 23// `cluster.mu` - taken when the actual changes in cluster configurations 24// happen. Different from `controlMutex` because in some cases we need to 25// access current cluster state even if the long-running reconfiguration is 26// going on. For example network stack may ask for the current cluster state in 27// the middle of the shutdown. Any time current cluster state is asked you 28// should take the read lock of `cluster.mu`. If you are writing an API 29// responder that returns synchronously, hold `cluster.mu.RLock()` for the 30// duration of the whole handler function. That ensures that node will not be 31// shut down until the handler has finished. 32// 33// NodeRunner implements its internal locks that should not be used outside of 34// the struct. Instead, you should just call `nodeRunner.State()` method to get 35// the current state of the cluster(still need `cluster.mu.RLock()` to access 36// `cluster.nr` reference itself). Most of the changes in NodeRunner happen 37// because of an external event(network problem, unexpected swarmkit error) and 38// Docker shouldn't take any locks that delay these changes from happening. 39// 40 41import ( 42 "context" 43 "fmt" 44 "net" 45 "os" 46 "path/filepath" 47 "sync" 48 "time" 49 50 "github.com/docker/docker/api/types/network" 51 types "github.com/docker/docker/api/types/swarm" 52 "github.com/docker/docker/daemon/cluster/controllers/plugin" 53 executorpkg "github.com/docker/docker/daemon/cluster/executor" 54 "github.com/docker/docker/pkg/signal" 55 lncluster "github.com/docker/libnetwork/cluster" 56 swarmapi "github.com/docker/swarmkit/api" 57 swarmnode "github.com/docker/swarmkit/node" 58 "github.com/pkg/errors" 59 "github.com/sirupsen/logrus" 60) 61 62const swarmDirName = "swarm" 63const controlSocket = "control.sock" 64const swarmConnectTimeout = 20 * time.Second 65const swarmRequestTimeout = 20 * time.Second 66const stateFile = "docker-state.json" 67const defaultAddr = "0.0.0.0:2377" 68 69const ( 70 initialReconnectDelay = 100 * time.Millisecond 71 maxReconnectDelay = 30 * time.Second 72 contextPrefix = "com.docker.swarm" 73) 74 75// NetworkSubnetsProvider exposes functions for retrieving the subnets 76// of networks managed by Docker, so they can be filtered. 77type NetworkSubnetsProvider interface { 78 Subnets() ([]net.IPNet, []net.IPNet) 79} 80 81// Config provides values for Cluster. 82type Config struct { 83 Root string 84 Name string 85 Backend executorpkg.Backend 86 ImageBackend executorpkg.ImageBackend 87 PluginBackend plugin.Backend 88 VolumeBackend executorpkg.VolumeBackend 89 NetworkSubnetsProvider NetworkSubnetsProvider 90 91 // DefaultAdvertiseAddr is the default host/IP or network interface to use 92 // if no AdvertiseAddr value is specified. 93 DefaultAdvertiseAddr string 94 95 // path to store runtime state, such as the swarm control socket 96 RuntimeRoot string 97 98 // WatchStream is a channel to pass watch API notifications to daemon 99 WatchStream chan *swarmapi.WatchMessage 100 101 // RaftHeartbeatTick is the number of ticks for heartbeat of quorum members 102 RaftHeartbeatTick uint32 103 104 // RaftElectionTick is the number of ticks to elapse before followers propose a new round of leader election 105 // This value should be 10x that of RaftHeartbeatTick 106 RaftElectionTick uint32 107} 108 109// Cluster provides capabilities to participate in a cluster as a worker or a 110// manager. 111type Cluster struct { 112 mu sync.RWMutex 113 controlMutex sync.RWMutex // protect init/join/leave user operations 114 nr *nodeRunner 115 root string 116 runtimeRoot string 117 config Config 118 configEvent chan lncluster.ConfigEventType // todo: make this array and goroutine safe 119 attachers map[string]*attacher 120 watchStream chan *swarmapi.WatchMessage 121} 122 123// attacher manages the in-memory attachment state of a container 124// attachment to a global scope network managed by swarm manager. It 125// helps in identifying the attachment ID via the taskID and the 126// corresponding attachment configuration obtained from the manager. 127type attacher struct { 128 taskID string 129 config *network.NetworkingConfig 130 inProgress bool 131 attachWaitCh chan *network.NetworkingConfig 132 attachCompleteCh chan struct{} 133 detachWaitCh chan struct{} 134} 135 136// New creates a new Cluster instance using provided config. 137func New(config Config) (*Cluster, error) { 138 root := filepath.Join(config.Root, swarmDirName) 139 if err := os.MkdirAll(root, 0700); err != nil { 140 return nil, err 141 } 142 if config.RuntimeRoot == "" { 143 config.RuntimeRoot = root 144 } 145 if config.RaftHeartbeatTick == 0 { 146 config.RaftHeartbeatTick = 1 147 } 148 if config.RaftElectionTick == 0 { 149 // 10X heartbeat tick is the recommended ratio according to etcd docs. 150 config.RaftElectionTick = 10 * config.RaftHeartbeatTick 151 } 152 153 if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil { 154 return nil, err 155 } 156 c := &Cluster{ 157 root: root, 158 config: config, 159 configEvent: make(chan lncluster.ConfigEventType, 10), 160 runtimeRoot: config.RuntimeRoot, 161 attachers: make(map[string]*attacher), 162 watchStream: config.WatchStream, 163 } 164 return c, nil 165} 166 167// Start the Cluster instance 168// TODO The split between New and Start can be join again when the SendClusterEvent 169// method is no longer required 170func (c *Cluster) Start() error { 171 root := filepath.Join(c.config.Root, swarmDirName) 172 173 nodeConfig, err := loadPersistentState(root) 174 if err != nil { 175 if os.IsNotExist(err) { 176 return nil 177 } 178 return err 179 } 180 181 nr, err := c.newNodeRunner(*nodeConfig) 182 if err != nil { 183 return err 184 } 185 c.nr = nr 186 187 select { 188 case <-time.After(swarmConnectTimeout): 189 logrus.Error("swarm component could not be started before timeout was reached") 190 case err := <-nr.Ready(): 191 if err != nil { 192 logrus.WithError(err).Error("swarm component could not be started") 193 return nil 194 } 195 } 196 return nil 197} 198 199func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) { 200 if err := c.config.Backend.IsSwarmCompatible(); err != nil { 201 return nil, err 202 } 203 204 actualLocalAddr := conf.LocalAddr 205 if actualLocalAddr == "" { 206 // If localAddr was not specified, resolve it automatically 207 // based on the route to joinAddr. localAddr can only be left 208 // empty on "join". 209 listenHost, _, err := net.SplitHostPort(conf.ListenAddr) 210 if err != nil { 211 return nil, fmt.Errorf("could not parse listen address: %v", err) 212 } 213 214 listenAddrIP := net.ParseIP(listenHost) 215 if listenAddrIP == nil || !listenAddrIP.IsUnspecified() { 216 actualLocalAddr = listenHost 217 } else { 218 if conf.RemoteAddr == "" { 219 // Should never happen except using swarms created by 220 // old versions that didn't save remoteAddr. 221 conf.RemoteAddr = "8.8.8.8:53" 222 } 223 conn, err := net.Dial("udp", conf.RemoteAddr) 224 if err != nil { 225 return nil, fmt.Errorf("could not find local IP address: %v", err) 226 } 227 localHostPort := conn.LocalAddr().String() 228 actualLocalAddr, _, _ = net.SplitHostPort(localHostPort) 229 conn.Close() 230 } 231 } 232 233 nr := &nodeRunner{cluster: c} 234 nr.actualLocalAddr = actualLocalAddr 235 236 if err := nr.Start(conf); err != nil { 237 return nil, err 238 } 239 240 c.config.Backend.DaemonJoinsCluster(c) 241 242 return nr, nil 243} 244 245func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost 246 return context.WithTimeout(context.Background(), swarmRequestTimeout) 247} 248 249// IsManager returns true if Cluster is participating as a manager. 250func (c *Cluster) IsManager() bool { 251 c.mu.RLock() 252 defer c.mu.RUnlock() 253 return c.currentNodeState().IsActiveManager() 254} 255 256// IsAgent returns true if Cluster is participating as a worker/agent. 257func (c *Cluster) IsAgent() bool { 258 c.mu.RLock() 259 defer c.mu.RUnlock() 260 return c.currentNodeState().status == types.LocalNodeStateActive 261} 262 263// GetLocalAddress returns the local address. 264func (c *Cluster) GetLocalAddress() string { 265 c.mu.RLock() 266 defer c.mu.RUnlock() 267 return c.currentNodeState().actualLocalAddr 268} 269 270// GetListenAddress returns the listen address. 271func (c *Cluster) GetListenAddress() string { 272 c.mu.RLock() 273 defer c.mu.RUnlock() 274 if c.nr != nil { 275 return c.nr.config.ListenAddr 276 } 277 return "" 278} 279 280// GetAdvertiseAddress returns the remotely reachable address of this node. 281func (c *Cluster) GetAdvertiseAddress() string { 282 c.mu.RLock() 283 defer c.mu.RUnlock() 284 if c.nr != nil && c.nr.config.AdvertiseAddr != "" { 285 advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr) 286 return advertiseHost 287 } 288 return c.currentNodeState().actualLocalAddr 289} 290 291// GetDataPathAddress returns the address to be used for the data path traffic, if specified. 292func (c *Cluster) GetDataPathAddress() string { 293 c.mu.RLock() 294 defer c.mu.RUnlock() 295 if c.nr != nil { 296 return c.nr.config.DataPathAddr 297 } 298 return "" 299} 300 301// GetRemoteAddressList returns the advertise address for each of the remote managers if 302// available. 303func (c *Cluster) GetRemoteAddressList() []string { 304 c.mu.RLock() 305 defer c.mu.RUnlock() 306 return c.getRemoteAddressList() 307} 308 309// GetWatchStream returns the channel to pass changes from store watch API 310func (c *Cluster) GetWatchStream() chan *swarmapi.WatchMessage { 311 c.mu.RLock() 312 defer c.mu.RUnlock() 313 return c.watchStream 314} 315 316func (c *Cluster) getRemoteAddressList() []string { 317 state := c.currentNodeState() 318 if state.swarmNode == nil { 319 return []string{} 320 } 321 322 nodeID := state.swarmNode.NodeID() 323 remotes := state.swarmNode.Remotes() 324 addressList := make([]string, 0, len(remotes)) 325 for _, r := range remotes { 326 if r.NodeID != nodeID { 327 addressList = append(addressList, r.Addr) 328 } 329 } 330 return addressList 331} 332 333// ListenClusterEvents returns a channel that receives messages on cluster 334// participation changes. 335// todo: make cancelable and accessible to multiple callers 336func (c *Cluster) ListenClusterEvents() <-chan lncluster.ConfigEventType { 337 return c.configEvent 338} 339 340// currentNodeState should not be called without a read lock 341func (c *Cluster) currentNodeState() nodeState { 342 return c.nr.State() 343} 344 345// errNoManager returns error describing why manager commands can't be used. 346// Call with read lock. 347func (c *Cluster) errNoManager(st nodeState) error { 348 if st.swarmNode == nil { 349 if errors.Cause(st.err) == errSwarmLocked { 350 return errSwarmLocked 351 } 352 if st.err == errSwarmCertificatesExpired { 353 return errSwarmCertificatesExpired 354 } 355 return errors.WithStack(notAvailableError("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")) 356 } 357 if st.swarmNode.Manager() != nil { 358 return errors.WithStack(notAvailableError("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")) 359 } 360 return errors.WithStack(notAvailableError("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")) 361} 362 363// Cleanup stops active swarm node. This is run before daemon shutdown. 364func (c *Cluster) Cleanup() { 365 c.controlMutex.Lock() 366 defer c.controlMutex.Unlock() 367 368 c.mu.Lock() 369 node := c.nr 370 if node == nil { 371 c.mu.Unlock() 372 return 373 } 374 state := c.currentNodeState() 375 c.mu.Unlock() 376 377 if state.IsActiveManager() { 378 active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID()) 379 if err == nil { 380 singlenode := active && isLastManager(reachable, unreachable) 381 if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) { 382 logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable) 383 } 384 } 385 } 386 387 if err := node.Stop(); err != nil { 388 logrus.Errorf("failed to shut down cluster node: %v", err) 389 signal.DumpStacks("") 390 } 391 392 c.mu.Lock() 393 c.nr = nil 394 c.mu.Unlock() 395} 396 397func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) { 398 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 399 defer cancel() 400 nodes, err := client.ListNodes(ctx, &swarmapi.ListNodesRequest{}) 401 if err != nil { 402 return false, 0, 0, err 403 } 404 for _, n := range nodes.Nodes { 405 if n.ManagerStatus != nil { 406 if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE { 407 reachable++ 408 if n.ID == currentNodeID { 409 current = true 410 } 411 } 412 if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE { 413 unreachable++ 414 } 415 } 416 } 417 return 418} 419 420func detectLockedError(err error) error { 421 if err == swarmnode.ErrInvalidUnlockKey { 422 return errors.WithStack(errSwarmLocked) 423 } 424 return err 425} 426 427func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error { 428 c.mu.RLock() 429 defer c.mu.RUnlock() 430 431 state := c.currentNodeState() 432 if !state.IsActiveManager() { 433 return c.errNoManager(state) 434 } 435 436 ctx, cancel := c.getRequestContext() 437 defer cancel() 438 439 return fn(ctx, state) 440} 441 442// SendClusterEvent allows to send cluster events on the configEvent channel 443// TODO This method should not be exposed. 444// Currently it is used to notify the network controller that the keys are 445// available 446func (c *Cluster) SendClusterEvent(event lncluster.ConfigEventType) { 447 c.mu.RLock() 448 defer c.mu.RUnlock() 449 c.configEvent <- event 450} 451