1package cluster // import "github.com/docker/docker/daemon/cluster"
2
3import (
4	"context"
5	"fmt"
6	"net"
7	"strings"
8	"time"
9
10	apitypes "github.com/docker/docker/api/types"
11	"github.com/docker/docker/api/types/filters"
12	types "github.com/docker/docker/api/types/swarm"
13	"github.com/docker/docker/daemon/cluster/convert"
14	"github.com/docker/docker/errdefs"
15	"github.com/docker/docker/opts"
16	"github.com/docker/docker/pkg/signal"
17	swarmapi "github.com/docker/swarmkit/api"
18	"github.com/docker/swarmkit/manager/encryption"
19	swarmnode "github.com/docker/swarmkit/node"
20	"github.com/pkg/errors"
21	"github.com/sirupsen/logrus"
22	"google.golang.org/grpc"
23)
24
25// Init initializes new cluster from user provided request.
26func (c *Cluster) Init(req types.InitRequest) (string, error) {
27	c.controlMutex.Lock()
28	defer c.controlMutex.Unlock()
29	if c.nr != nil {
30		if req.ForceNewCluster {
31
32			// Take c.mu temporarily to wait for presently running
33			// API handlers to finish before shutting down the node.
34			c.mu.Lock()
35			if !c.nr.nodeState.IsManager() {
36				return "", errSwarmNotManager
37			}
38			c.mu.Unlock()
39
40			if err := c.nr.Stop(); err != nil {
41				return "", err
42			}
43		} else {
44			return "", errSwarmExists
45		}
46	}
47
48	if err := validateAndSanitizeInitRequest(&req); err != nil {
49		return "", errdefs.InvalidParameter(err)
50	}
51
52	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
53	if err != nil {
54		return "", err
55	}
56
57	advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
58	if err != nil {
59		return "", err
60	}
61
62	dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
63	if err != nil {
64		return "", err
65	}
66
67	localAddr := listenHost
68
69	// If the local address is undetermined, the advertise address
70	// will be used as local address, if it belongs to this system.
71	// If the advertise address is not local, then we try to find
72	// a system address to use as local address. If this fails,
73	// we give up and ask the user to pass the listen address.
74	if net.ParseIP(localAddr).IsUnspecified() {
75		advertiseIP := net.ParseIP(advertiseHost)
76
77		found := false
78		for _, systemIP := range listSystemIPs() {
79			if systemIP.Equal(advertiseIP) {
80				localAddr = advertiseIP.String()
81				found = true
82				break
83			}
84		}
85
86		if !found {
87			ip, err := c.resolveSystemAddr()
88			if err != nil {
89				logrus.Warnf("Could not find a local address: %v", err)
90				return "", errMustSpecifyListenAddr
91			}
92			localAddr = ip.String()
93		}
94	}
95
96	//Validate Default Address Pool input
97	if err := validateDefaultAddrPool(req.DefaultAddrPool, req.SubnetSize); err != nil {
98		return "", err
99	}
100
101	port, err := getDataPathPort(req.DataPathPort)
102	if err != nil {
103		return "", err
104	}
105
106	nr, err := c.newNodeRunner(nodeStartConfig{
107		forceNewCluster:    req.ForceNewCluster,
108		autolock:           req.AutoLockManagers,
109		LocalAddr:          localAddr,
110		ListenAddr:         net.JoinHostPort(listenHost, listenPort),
111		AdvertiseAddr:      net.JoinHostPort(advertiseHost, advertisePort),
112		DataPathAddr:       dataPathAddr,
113		DefaultAddressPool: req.DefaultAddrPool,
114		SubnetSize:         req.SubnetSize,
115		availability:       req.Availability,
116		DataPathPort:       port,
117	})
118	if err != nil {
119		return "", err
120	}
121	c.mu.Lock()
122	c.nr = nr
123	c.mu.Unlock()
124
125	if err := <-nr.Ready(); err != nil {
126		c.mu.Lock()
127		c.nr = nil
128		c.mu.Unlock()
129		if !req.ForceNewCluster { // if failure on first attempt don't keep state
130			if err := clearPersistentState(c.root); err != nil {
131				return "", err
132			}
133		}
134		return "", err
135	}
136	state := nr.State()
137	if state.swarmNode == nil { // should never happen but protect from panic
138		return "", errors.New("invalid cluster state for spec initialization")
139	}
140	if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
141		return "", err
142	}
143	return state.NodeID(), nil
144}
145
146// Join makes current Cluster part of an existing swarm cluster.
147func (c *Cluster) Join(req types.JoinRequest) error {
148	c.controlMutex.Lock()
149	defer c.controlMutex.Unlock()
150	c.mu.Lock()
151	if c.nr != nil {
152		c.mu.Unlock()
153		return errors.WithStack(errSwarmExists)
154	}
155	c.mu.Unlock()
156
157	if err := validateAndSanitizeJoinRequest(&req); err != nil {
158		return errdefs.InvalidParameter(err)
159	}
160
161	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
162	if err != nil {
163		return err
164	}
165
166	var advertiseAddr string
167	if req.AdvertiseAddr != "" {
168		advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
169		// For joining, we don't need to provide an advertise address,
170		// since the remote side can detect it.
171		if err == nil {
172			advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
173		}
174	}
175
176	dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
177	if err != nil {
178		return err
179	}
180
181	nr, err := c.newNodeRunner(nodeStartConfig{
182		RemoteAddr:    req.RemoteAddrs[0],
183		ListenAddr:    net.JoinHostPort(listenHost, listenPort),
184		AdvertiseAddr: advertiseAddr,
185		DataPathAddr:  dataPathAddr,
186		joinAddr:      req.RemoteAddrs[0],
187		joinToken:     req.JoinToken,
188		availability:  req.Availability,
189	})
190	if err != nil {
191		return err
192	}
193
194	c.mu.Lock()
195	c.nr = nr
196	c.mu.Unlock()
197
198	select {
199	case <-time.After(swarmConnectTimeout):
200		return errSwarmJoinTimeoutReached
201	case err := <-nr.Ready():
202		if err != nil {
203			c.mu.Lock()
204			c.nr = nil
205			c.mu.Unlock()
206			if err := clearPersistentState(c.root); err != nil {
207				return err
208			}
209		}
210		return err
211	}
212}
213
214// Inspect retrieves the configuration properties of a managed swarm cluster.
215func (c *Cluster) Inspect() (types.Swarm, error) {
216	var swarm types.Swarm
217	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
218		s, err := c.inspect(ctx, state)
219		if err != nil {
220			return err
221		}
222		swarm = s
223		return nil
224	}); err != nil {
225		return types.Swarm{}, err
226	}
227	return swarm, nil
228}
229
230func (c *Cluster) inspect(ctx context.Context, state nodeState) (types.Swarm, error) {
231	s, err := getSwarm(ctx, state.controlClient)
232	if err != nil {
233		return types.Swarm{}, err
234	}
235	return convert.SwarmFromGRPC(*s), nil
236}
237
238// Update updates configuration of a managed swarm cluster.
239func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
240	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
241		swarm, err := getSwarm(ctx, state.controlClient)
242		if err != nil {
243			return err
244		}
245
246		// Validate spec name.
247		if spec.Annotations.Name == "" {
248			spec.Annotations.Name = "default"
249		} else if spec.Annotations.Name != "default" {
250			return errdefs.InvalidParameter(errors.New(`swarm spec must be named "default"`))
251		}
252
253		// In update, client should provide the complete spec of the swarm, including
254		// Name and Labels. If a field is specified with 0 or nil, then the default value
255		// will be used to swarmkit.
256		clusterSpec, err := convert.SwarmSpecToGRPC(spec)
257		if err != nil {
258			return errdefs.InvalidParameter(err)
259		}
260
261		_, err = state.controlClient.UpdateCluster(
262			ctx,
263			&swarmapi.UpdateClusterRequest{
264				ClusterID: swarm.ID,
265				Spec:      &clusterSpec,
266				ClusterVersion: &swarmapi.Version{
267					Index: version,
268				},
269				Rotation: swarmapi.KeyRotation{
270					WorkerJoinToken:  flags.RotateWorkerToken,
271					ManagerJoinToken: flags.RotateManagerToken,
272					ManagerUnlockKey: flags.RotateManagerUnlockKey,
273				},
274			},
275		)
276		return err
277	})
278}
279
280// GetUnlockKey returns the unlock key for the swarm.
281func (c *Cluster) GetUnlockKey() (string, error) {
282	var resp *swarmapi.GetUnlockKeyResponse
283	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
284		client := swarmapi.NewCAClient(state.grpcConn)
285
286		r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
287		if err != nil {
288			return err
289		}
290		resp = r
291		return nil
292	}); err != nil {
293		return "", err
294	}
295	if len(resp.UnlockKey) == 0 {
296		// no key
297		return "", nil
298	}
299	return encryption.HumanReadableKey(resp.UnlockKey), nil
300}
301
302// UnlockSwarm provides a key to decrypt data that is encrypted at rest.
303func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
304	c.controlMutex.Lock()
305	defer c.controlMutex.Unlock()
306
307	c.mu.RLock()
308	state := c.currentNodeState()
309
310	if !state.IsActiveManager() {
311		// when manager is not active,
312		// unless it is locked, otherwise return error.
313		if err := c.errNoManager(state); err != errSwarmLocked {
314			c.mu.RUnlock()
315			return err
316		}
317	} else {
318		// when manager is active, return an error of "not locked"
319		c.mu.RUnlock()
320		return notLockedError{}
321	}
322
323	// only when swarm is locked, code running reaches here
324	nr := c.nr
325	c.mu.RUnlock()
326
327	key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
328	if err != nil {
329		return errdefs.InvalidParameter(err)
330	}
331
332	config := nr.config
333	config.lockKey = key
334	if err := nr.Stop(); err != nil {
335		return err
336	}
337	nr, err = c.newNodeRunner(config)
338	if err != nil {
339		return err
340	}
341
342	c.mu.Lock()
343	c.nr = nr
344	c.mu.Unlock()
345
346	if err := <-nr.Ready(); err != nil {
347		if errors.Cause(err) == errSwarmLocked {
348			return invalidUnlockKey{}
349		}
350		return errors.Errorf("swarm component could not be started: %v", err)
351	}
352	return nil
353}
354
355// Leave shuts down Cluster and removes current state.
356func (c *Cluster) Leave(force bool) error {
357	c.controlMutex.Lock()
358	defer c.controlMutex.Unlock()
359
360	c.mu.Lock()
361	nr := c.nr
362	if nr == nil {
363		c.mu.Unlock()
364		return errors.WithStack(errNoSwarm)
365	}
366
367	state := c.currentNodeState()
368
369	c.mu.Unlock()
370
371	if errors.Cause(state.err) == errSwarmLocked && !force {
372		// leave a locked swarm without --force is not allowed
373		return errors.WithStack(notAvailableError("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message."))
374	}
375
376	if state.IsManager() && !force {
377		msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
378		if state.IsActiveManager() {
379			active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
380			if err == nil {
381				if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
382					if isLastManager(reachable, unreachable) {
383						msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
384						return errors.WithStack(notAvailableError(msg))
385					}
386					msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
387				}
388			}
389		} else {
390			msg += "Doing so may lose the consensus of your cluster. "
391		}
392
393		msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
394		return errors.WithStack(notAvailableError(msg))
395	}
396	// release readers in here
397	if err := nr.Stop(); err != nil {
398		logrus.Errorf("failed to shut down cluster node: %v", err)
399		signal.DumpStacks("")
400		return err
401	}
402
403	c.mu.Lock()
404	c.nr = nil
405	c.mu.Unlock()
406
407	if nodeID := state.NodeID(); nodeID != "" {
408		nodeContainers, err := c.listContainerForNode(nodeID)
409		if err != nil {
410			return err
411		}
412		for _, id := range nodeContainers {
413			if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
414				logrus.Errorf("error removing %v: %v", id, err)
415			}
416		}
417	}
418
419	// todo: cleanup optional?
420	if err := clearPersistentState(c.root); err != nil {
421		return err
422	}
423	c.config.Backend.DaemonLeavesCluster()
424	return nil
425}
426
427// Info returns information about the current cluster state.
428func (c *Cluster) Info() types.Info {
429	info := types.Info{
430		NodeAddr: c.GetAdvertiseAddress(),
431	}
432	c.mu.RLock()
433	defer c.mu.RUnlock()
434
435	state := c.currentNodeState()
436	info.LocalNodeState = state.status
437	if state.err != nil {
438		info.Error = state.err.Error()
439	}
440
441	ctx, cancel := c.getRequestContext()
442	defer cancel()
443
444	if state.IsActiveManager() {
445		info.ControlAvailable = true
446		swarm, err := c.inspect(ctx, state)
447		if err != nil {
448			info.Error = err.Error()
449		}
450
451		info.Cluster = &swarm.ClusterInfo
452
453		if r, err := state.controlClient.ListNodes(
454			ctx, &swarmapi.ListNodesRequest{},
455			grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
456		); err != nil {
457			info.Error = err.Error()
458		} else {
459			info.Nodes = len(r.Nodes)
460			for _, n := range r.Nodes {
461				if n.ManagerStatus != nil {
462					info.Managers = info.Managers + 1
463				}
464			}
465		}
466
467		switch info.LocalNodeState {
468		case types.LocalNodeStateInactive, types.LocalNodeStateLocked, types.LocalNodeStateError:
469			// nothing to do
470		default:
471			if info.Managers == 2 {
472				const warn string = `WARNING: Running Swarm in a two-manager configuration. This configuration provides
473         no fault tolerance, and poses a high risk to lose control over the cluster.
474         Refer to https://docs.docker.com/engine/swarm/admin_guide/ to configure the
475         Swarm for fault-tolerance.`
476
477				info.Warnings = append(info.Warnings, warn)
478			}
479		}
480	}
481
482	if state.swarmNode != nil {
483		for _, r := range state.swarmNode.Remotes() {
484			info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
485		}
486		info.NodeID = state.swarmNode.NodeID()
487	}
488
489	return info
490}
491
492func validateAndSanitizeInitRequest(req *types.InitRequest) error {
493	var err error
494	req.ListenAddr, err = validateAddr(req.ListenAddr)
495	if err != nil {
496		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
497	}
498
499	if req.Spec.Annotations.Name == "" {
500		req.Spec.Annotations.Name = "default"
501	} else if req.Spec.Annotations.Name != "default" {
502		return errors.New(`swarm spec must be named "default"`)
503	}
504
505	return nil
506}
507
508func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
509	var err error
510	req.ListenAddr, err = validateAddr(req.ListenAddr)
511	if err != nil {
512		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
513	}
514	if len(req.RemoteAddrs) == 0 {
515		return errors.New("at least 1 RemoteAddr is required to join")
516	}
517	for i := range req.RemoteAddrs {
518		req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
519		if err != nil {
520			return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
521		}
522	}
523	return nil
524}
525
526func validateAddr(addr string) (string, error) {
527	if addr == "" {
528		return addr, errors.New("invalid empty address")
529	}
530	newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
531	if err != nil {
532		return addr, nil
533	}
534	return strings.TrimPrefix(newaddr, "tcp://"), nil
535}
536
537func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
538	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
539	defer cancel()
540	for conn := range node.ListenControlSocket(ctx) {
541		if ctx.Err() != nil {
542			return ctx.Err()
543		}
544		if conn != nil {
545			client := swarmapi.NewControlClient(conn)
546			var cluster *swarmapi.Cluster
547			for i := 0; ; i++ {
548				lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
549				if err != nil {
550					return fmt.Errorf("error on listing clusters: %v", err)
551				}
552				if len(lcr.Clusters) == 0 {
553					if i < 10 {
554						time.Sleep(200 * time.Millisecond)
555						continue
556					}
557					return errors.New("empty list of clusters was returned")
558				}
559				cluster = lcr.Clusters[0]
560				break
561			}
562			// In init, we take the initial default values from swarmkit, and merge
563			// any non nil or 0 value from spec to GRPC spec. This will leave the
564			// default value alone.
565			// Note that this is different from Update(), as in Update() we expect
566			// user to specify the complete spec of the cluster (as they already know
567			// the existing one and knows which field to update)
568			clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
569			if err != nil {
570				return fmt.Errorf("error updating cluster settings: %v", err)
571			}
572			_, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
573				ClusterID:      cluster.ID,
574				ClusterVersion: &cluster.Meta.Version,
575				Spec:           &clusterSpec,
576			})
577			if err != nil {
578				return fmt.Errorf("error updating cluster settings: %v", err)
579			}
580			return nil
581		}
582	}
583	return ctx.Err()
584}
585
586func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
587	var ids []string
588	filters := filters.NewArgs()
589	filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
590	containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
591		Filters: filters,
592	})
593	if err != nil {
594		return []string{}, err
595	}
596	for _, c := range containers {
597		ids = append(ids, c.ID)
598	}
599	return ids, nil
600}
601