1package cluster // import "github.com/docker/docker/daemon/cluster"
2
3//
4// ## Swarmkit integration
5//
6// Cluster - static configurable object for accessing everything swarm related.
7// Contains methods for connecting and controlling the cluster. Exists always,
8// even if swarm mode is not enabled.
9//
10// NodeRunner - Manager for starting the swarmkit node. Is present only and
11// always if swarm mode is enabled. Implements backoff restart loop in case of
12// errors.
13//
14// NodeState - Information about the current node status including access to
15// gRPC clients if a manager is active.
16//
17// ### Locking
18//
19// `cluster.controlMutex` - taken for the whole lifecycle of the processes that
20// can reconfigure cluster(init/join/leave etc). Protects that one
21// reconfiguration action has fully completed before another can start.
22//
23// `cluster.mu` - taken when the actual changes in cluster configurations
24// happen. Different from `controlMutex` because in some cases we need to
25// access current cluster state even if the long-running reconfiguration is
26// going on. For example network stack may ask for the current cluster state in
27// the middle of the shutdown. Any time current cluster state is asked you
28// should take the read lock of `cluster.mu`. If you are writing an API
29// responder that returns synchronously, hold `cluster.mu.RLock()` for the
30// duration of the whole handler function. That ensures that node will not be
31// shut down until the handler has finished.
32//
33// NodeRunner implements its internal locks that should not be used outside of
34// the struct. Instead, you should just call `nodeRunner.State()` method to get
35// the current state of the cluster(still need `cluster.mu.RLock()` to access
36// `cluster.nr` reference itself). Most of the changes in NodeRunner happen
37// because of an external event(network problem, unexpected swarmkit error) and
38// Docker shouldn't take any locks that delay these changes from happening.
39//
40
41import (
42	"context"
43	"fmt"
44	"net"
45	"os"
46	"path/filepath"
47	"sync"
48	"time"
49
50	"github.com/docker/docker/api/types/network"
51	types "github.com/docker/docker/api/types/swarm"
52	"github.com/docker/docker/daemon/cluster/controllers/plugin"
53	executorpkg "github.com/docker/docker/daemon/cluster/executor"
54	"github.com/docker/docker/pkg/signal"
55	lncluster "github.com/docker/libnetwork/cluster"
56	swarmapi "github.com/docker/swarmkit/api"
57	swarmnode "github.com/docker/swarmkit/node"
58	"github.com/pkg/errors"
59	"github.com/sirupsen/logrus"
60)
61
62const swarmDirName = "swarm"
63const controlSocket = "control.sock"
64const swarmConnectTimeout = 20 * time.Second
65const swarmRequestTimeout = 20 * time.Second
66const stateFile = "docker-state.json"
67const defaultAddr = "0.0.0.0:2377"
68
69const (
70	initialReconnectDelay = 100 * time.Millisecond
71	maxReconnectDelay     = 30 * time.Second
72	contextPrefix         = "com.docker.swarm"
73)
74
75// NetworkSubnetsProvider exposes functions for retrieving the subnets
76// of networks managed by Docker, so they can be filtered.
77type NetworkSubnetsProvider interface {
78	Subnets() ([]net.IPNet, []net.IPNet)
79}
80
81// Config provides values for Cluster.
82type Config struct {
83	Root                   string
84	Name                   string
85	Backend                executorpkg.Backend
86	ImageBackend           executorpkg.ImageBackend
87	PluginBackend          plugin.Backend
88	VolumeBackend          executorpkg.VolumeBackend
89	NetworkSubnetsProvider NetworkSubnetsProvider
90
91	// DefaultAdvertiseAddr is the default host/IP or network interface to use
92	// if no AdvertiseAddr value is specified.
93	DefaultAdvertiseAddr string
94
95	// path to store runtime state, such as the swarm control socket
96	RuntimeRoot string
97
98	// WatchStream is a channel to pass watch API notifications to daemon
99	WatchStream chan *swarmapi.WatchMessage
100
101	// RaftHeartbeatTick is the number of ticks for heartbeat of quorum members
102	RaftHeartbeatTick uint32
103
104	// RaftElectionTick is the number of ticks to elapse before followers propose a new round of leader election
105	// This value should be 10x that of RaftHeartbeatTick
106	RaftElectionTick uint32
107}
108
109// Cluster provides capabilities to participate in a cluster as a worker or a
110// manager.
111type Cluster struct {
112	mu           sync.RWMutex
113	controlMutex sync.RWMutex // protect init/join/leave user operations
114	nr           *nodeRunner
115	root         string
116	runtimeRoot  string
117	config       Config
118	configEvent  chan lncluster.ConfigEventType // todo: make this array and goroutine safe
119	attachers    map[string]*attacher
120	watchStream  chan *swarmapi.WatchMessage
121}
122
123// attacher manages the in-memory attachment state of a container
124// attachment to a global scope network managed by swarm manager. It
125// helps in identifying the attachment ID via the taskID and the
126// corresponding attachment configuration obtained from the manager.
127type attacher struct {
128	taskID           string
129	config           *network.NetworkingConfig
130	inProgress       bool
131	attachWaitCh     chan *network.NetworkingConfig
132	attachCompleteCh chan struct{}
133	detachWaitCh     chan struct{}
134}
135
136// New creates a new Cluster instance using provided config.
137func New(config Config) (*Cluster, error) {
138	root := filepath.Join(config.Root, swarmDirName)
139	if err := os.MkdirAll(root, 0700); err != nil {
140		return nil, err
141	}
142	if config.RuntimeRoot == "" {
143		config.RuntimeRoot = root
144	}
145	if config.RaftHeartbeatTick == 0 {
146		config.RaftHeartbeatTick = 1
147	}
148	if config.RaftElectionTick == 0 {
149		// 10X heartbeat tick is the recommended ratio according to etcd docs.
150		config.RaftElectionTick = 10 * config.RaftHeartbeatTick
151	}
152
153	if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
154		return nil, err
155	}
156	c := &Cluster{
157		root:        root,
158		config:      config,
159		configEvent: make(chan lncluster.ConfigEventType, 10),
160		runtimeRoot: config.RuntimeRoot,
161		attachers:   make(map[string]*attacher),
162		watchStream: config.WatchStream,
163	}
164	return c, nil
165}
166
167// Start the Cluster instance
168// TODO The split between New and Start can be join again when the SendClusterEvent
169// method is no longer required
170func (c *Cluster) Start() error {
171	root := filepath.Join(c.config.Root, swarmDirName)
172
173	nodeConfig, err := loadPersistentState(root)
174	if err != nil {
175		if os.IsNotExist(err) {
176			return nil
177		}
178		return err
179	}
180
181	nr, err := c.newNodeRunner(*nodeConfig)
182	if err != nil {
183		return err
184	}
185	c.nr = nr
186
187	select {
188	case <-time.After(swarmConnectTimeout):
189		logrus.Error("swarm component could not be started before timeout was reached")
190	case err := <-nr.Ready():
191		if err != nil {
192			logrus.WithError(err).Error("swarm component could not be started")
193			return nil
194		}
195	}
196	return nil
197}
198
199func (c *Cluster) newNodeRunner(conf nodeStartConfig) (*nodeRunner, error) {
200	if err := c.config.Backend.IsSwarmCompatible(); err != nil {
201		return nil, err
202	}
203
204	actualLocalAddr := conf.LocalAddr
205	if actualLocalAddr == "" {
206		// If localAddr was not specified, resolve it automatically
207		// based on the route to joinAddr. localAddr can only be left
208		// empty on "join".
209		listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
210		if err != nil {
211			return nil, fmt.Errorf("could not parse listen address: %v", err)
212		}
213
214		listenAddrIP := net.ParseIP(listenHost)
215		if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
216			actualLocalAddr = listenHost
217		} else {
218			if conf.RemoteAddr == "" {
219				// Should never happen except using swarms created by
220				// old versions that didn't save remoteAddr.
221				conf.RemoteAddr = "8.8.8.8:53"
222			}
223			conn, err := net.Dial("udp", conf.RemoteAddr)
224			if err != nil {
225				return nil, fmt.Errorf("could not find local IP address: %v", err)
226			}
227			localHostPort := conn.LocalAddr().String()
228			actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
229			conn.Close()
230		}
231	}
232
233	nr := &nodeRunner{cluster: c}
234	nr.actualLocalAddr = actualLocalAddr
235
236	if err := nr.Start(conf); err != nil {
237		return nil, err
238	}
239
240	c.config.Backend.DaemonJoinsCluster(c)
241
242	return nr, nil
243}
244
245func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
246	return context.WithTimeout(context.Background(), swarmRequestTimeout)
247}
248
249// IsManager returns true if Cluster is participating as a manager.
250func (c *Cluster) IsManager() bool {
251	c.mu.RLock()
252	defer c.mu.RUnlock()
253	return c.currentNodeState().IsActiveManager()
254}
255
256// IsAgent returns true if Cluster is participating as a worker/agent.
257func (c *Cluster) IsAgent() bool {
258	c.mu.RLock()
259	defer c.mu.RUnlock()
260	return c.currentNodeState().status == types.LocalNodeStateActive
261}
262
263// GetLocalAddress returns the local address.
264func (c *Cluster) GetLocalAddress() string {
265	c.mu.RLock()
266	defer c.mu.RUnlock()
267	return c.currentNodeState().actualLocalAddr
268}
269
270// GetListenAddress returns the listen address.
271func (c *Cluster) GetListenAddress() string {
272	c.mu.RLock()
273	defer c.mu.RUnlock()
274	if c.nr != nil {
275		return c.nr.config.ListenAddr
276	}
277	return ""
278}
279
280// GetAdvertiseAddress returns the remotely reachable address of this node.
281func (c *Cluster) GetAdvertiseAddress() string {
282	c.mu.RLock()
283	defer c.mu.RUnlock()
284	if c.nr != nil && c.nr.config.AdvertiseAddr != "" {
285		advertiseHost, _, _ := net.SplitHostPort(c.nr.config.AdvertiseAddr)
286		return advertiseHost
287	}
288	return c.currentNodeState().actualLocalAddr
289}
290
291// GetDataPathAddress returns the address to be used for the data path traffic, if specified.
292func (c *Cluster) GetDataPathAddress() string {
293	c.mu.RLock()
294	defer c.mu.RUnlock()
295	if c.nr != nil {
296		return c.nr.config.DataPathAddr
297	}
298	return ""
299}
300
301// GetRemoteAddressList returns the advertise address for each of the remote managers if
302// available.
303func (c *Cluster) GetRemoteAddressList() []string {
304	c.mu.RLock()
305	defer c.mu.RUnlock()
306	return c.getRemoteAddressList()
307}
308
309// GetWatchStream returns the channel to pass changes from store watch API
310func (c *Cluster) GetWatchStream() chan *swarmapi.WatchMessage {
311	c.mu.RLock()
312	defer c.mu.RUnlock()
313	return c.watchStream
314}
315
316func (c *Cluster) getRemoteAddressList() []string {
317	state := c.currentNodeState()
318	if state.swarmNode == nil {
319		return []string{}
320	}
321
322	nodeID := state.swarmNode.NodeID()
323	remotes := state.swarmNode.Remotes()
324	addressList := make([]string, 0, len(remotes))
325	for _, r := range remotes {
326		if r.NodeID != nodeID {
327			addressList = append(addressList, r.Addr)
328		}
329	}
330	return addressList
331}
332
333// ListenClusterEvents returns a channel that receives messages on cluster
334// participation changes.
335// todo: make cancelable and accessible to multiple callers
336func (c *Cluster) ListenClusterEvents() <-chan lncluster.ConfigEventType {
337	return c.configEvent
338}
339
340// currentNodeState should not be called without a read lock
341func (c *Cluster) currentNodeState() nodeState {
342	return c.nr.State()
343}
344
345// errNoManager returns error describing why manager commands can't be used.
346// Call with read lock.
347func (c *Cluster) errNoManager(st nodeState) error {
348	if st.swarmNode == nil {
349		if errors.Cause(st.err) == errSwarmLocked {
350			return errSwarmLocked
351		}
352		if st.err == errSwarmCertificatesExpired {
353			return errSwarmCertificatesExpired
354		}
355		return errors.WithStack(notAvailableError("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again."))
356	}
357	if st.swarmNode.Manager() != nil {
358		return errors.WithStack(notAvailableError("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster."))
359	}
360	return errors.WithStack(notAvailableError("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager."))
361}
362
363// Cleanup stops active swarm node. This is run before daemon shutdown.
364func (c *Cluster) Cleanup() {
365	c.controlMutex.Lock()
366	defer c.controlMutex.Unlock()
367
368	c.mu.Lock()
369	node := c.nr
370	if node == nil {
371		c.mu.Unlock()
372		return
373	}
374	state := c.currentNodeState()
375	c.mu.Unlock()
376
377	if state.IsActiveManager() {
378		active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
379		if err == nil {
380			singlenode := active && isLastManager(reachable, unreachable)
381			if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
382				logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
383			}
384		}
385	}
386
387	if err := node.Stop(); err != nil {
388		logrus.Errorf("failed to shut down cluster node: %v", err)
389		signal.DumpStacks("")
390	}
391
392	c.mu.Lock()
393	c.nr = nil
394	c.mu.Unlock()
395}
396
397func managerStats(client swarmapi.ControlClient, currentNodeID string) (current bool, reachable int, unreachable int, err error) {
398	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
399	defer cancel()
400	nodes, err := client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
401	if err != nil {
402		return false, 0, 0, err
403	}
404	for _, n := range nodes.Nodes {
405		if n.ManagerStatus != nil {
406			if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
407				reachable++
408				if n.ID == currentNodeID {
409					current = true
410				}
411			}
412			if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
413				unreachable++
414			}
415		}
416	}
417	return
418}
419
420func detectLockedError(err error) error {
421	if err == swarmnode.ErrInvalidUnlockKey {
422		return errors.WithStack(errSwarmLocked)
423	}
424	return err
425}
426
427func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error {
428	c.mu.RLock()
429	defer c.mu.RUnlock()
430
431	state := c.currentNodeState()
432	if !state.IsActiveManager() {
433		return c.errNoManager(state)
434	}
435
436	ctx, cancel := c.getRequestContext()
437	defer cancel()
438
439	return fn(ctx, state)
440}
441
442// SendClusterEvent allows to send cluster events on the configEvent channel
443// TODO This method should not be exposed.
444// Currently it is used to notify the network controller that the keys are
445// available
446func (c *Cluster) SendClusterEvent(event lncluster.ConfigEventType) {
447	c.mu.RLock()
448	defer c.mu.RUnlock()
449	c.configEvent <- event
450}
451