1package cluster // import "github.com/docker/docker/daemon/cluster"
2
3import (
4	"context"
5	"fmt"
6	"net"
7	"strings"
8	"time"
9
10	apitypes "github.com/docker/docker/api/types"
11	"github.com/docker/docker/api/types/filters"
12	types "github.com/docker/docker/api/types/swarm"
13	"github.com/docker/docker/daemon/cluster/convert"
14	"github.com/docker/docker/errdefs"
15	"github.com/docker/docker/opts"
16	"github.com/docker/docker/pkg/signal"
17	swarmapi "github.com/docker/swarmkit/api"
18	"github.com/docker/swarmkit/manager/encryption"
19	swarmnode "github.com/docker/swarmkit/node"
20	"github.com/pkg/errors"
21	"github.com/sirupsen/logrus"
22)
23
24// Init initializes new cluster from user provided request.
25func (c *Cluster) Init(req types.InitRequest) (string, error) {
26	c.controlMutex.Lock()
27	defer c.controlMutex.Unlock()
28	if c.nr != nil {
29		if req.ForceNewCluster {
30
31			// Take c.mu temporarily to wait for presently running
32			// API handlers to finish before shutting down the node.
33			c.mu.Lock()
34			if !c.nr.nodeState.IsManager() {
35				return "", errSwarmNotManager
36			}
37			c.mu.Unlock()
38
39			if err := c.nr.Stop(); err != nil {
40				return "", err
41			}
42		} else {
43			return "", errSwarmExists
44		}
45	}
46
47	if err := validateAndSanitizeInitRequest(&req); err != nil {
48		return "", errdefs.InvalidParameter(err)
49	}
50
51	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
52	if err != nil {
53		return "", err
54	}
55
56	advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
57	if err != nil {
58		return "", err
59	}
60
61	dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
62	if err != nil {
63		return "", err
64	}
65
66	localAddr := listenHost
67
68	// If the local address is undetermined, the advertise address
69	// will be used as local address, if it belongs to this system.
70	// If the advertise address is not local, then we try to find
71	// a system address to use as local address. If this fails,
72	// we give up and ask the user to pass the listen address.
73	if net.ParseIP(localAddr).IsUnspecified() {
74		advertiseIP := net.ParseIP(advertiseHost)
75
76		found := false
77		for _, systemIP := range listSystemIPs() {
78			if systemIP.Equal(advertiseIP) {
79				localAddr = advertiseIP.String()
80				found = true
81				break
82			}
83		}
84
85		if !found {
86			ip, err := c.resolveSystemAddr()
87			if err != nil {
88				logrus.Warnf("Could not find a local address: %v", err)
89				return "", errMustSpecifyListenAddr
90			}
91			localAddr = ip.String()
92		}
93	}
94
95	nr, err := c.newNodeRunner(nodeStartConfig{
96		forceNewCluster:    req.ForceNewCluster,
97		autolock:           req.AutoLockManagers,
98		LocalAddr:          localAddr,
99		ListenAddr:         net.JoinHostPort(listenHost, listenPort),
100		AdvertiseAddr:      net.JoinHostPort(advertiseHost, advertisePort),
101		DataPathAddr:       dataPathAddr,
102		DefaultAddressPool: req.DefaultAddrPool,
103		SubnetSize:         req.SubnetSize,
104		availability:       req.Availability,
105	})
106	if err != nil {
107		return "", err
108	}
109	c.mu.Lock()
110	c.nr = nr
111	c.mu.Unlock()
112
113	if err := <-nr.Ready(); err != nil {
114		c.mu.Lock()
115		c.nr = nil
116		c.mu.Unlock()
117		if !req.ForceNewCluster { // if failure on first attempt don't keep state
118			if err := clearPersistentState(c.root); err != nil {
119				return "", err
120			}
121		}
122		return "", err
123	}
124	state := nr.State()
125	if state.swarmNode == nil { // should never happen but protect from panic
126		return "", errors.New("invalid cluster state for spec initialization")
127	}
128	if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
129		return "", err
130	}
131	return state.NodeID(), nil
132}
133
134// Join makes current Cluster part of an existing swarm cluster.
135func (c *Cluster) Join(req types.JoinRequest) error {
136	c.controlMutex.Lock()
137	defer c.controlMutex.Unlock()
138	c.mu.Lock()
139	if c.nr != nil {
140		c.mu.Unlock()
141		return errors.WithStack(errSwarmExists)
142	}
143	c.mu.Unlock()
144
145	if err := validateAndSanitizeJoinRequest(&req); err != nil {
146		return errdefs.InvalidParameter(err)
147	}
148
149	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
150	if err != nil {
151		return err
152	}
153
154	var advertiseAddr string
155	if req.AdvertiseAddr != "" {
156		advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
157		// For joining, we don't need to provide an advertise address,
158		// since the remote side can detect it.
159		if err == nil {
160			advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
161		}
162	}
163
164	dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
165	if err != nil {
166		return err
167	}
168
169	nr, err := c.newNodeRunner(nodeStartConfig{
170		RemoteAddr:    req.RemoteAddrs[0],
171		ListenAddr:    net.JoinHostPort(listenHost, listenPort),
172		AdvertiseAddr: advertiseAddr,
173		DataPathAddr:  dataPathAddr,
174		joinAddr:      req.RemoteAddrs[0],
175		joinToken:     req.JoinToken,
176		availability:  req.Availability,
177	})
178	if err != nil {
179		return err
180	}
181
182	c.mu.Lock()
183	c.nr = nr
184	c.mu.Unlock()
185
186	select {
187	case <-time.After(swarmConnectTimeout):
188		return errSwarmJoinTimeoutReached
189	case err := <-nr.Ready():
190		if err != nil {
191			c.mu.Lock()
192			c.nr = nil
193			c.mu.Unlock()
194			if err := clearPersistentState(c.root); err != nil {
195				return err
196			}
197		}
198		return err
199	}
200}
201
202// Inspect retrieves the configuration properties of a managed swarm cluster.
203func (c *Cluster) Inspect() (types.Swarm, error) {
204	var swarm types.Swarm
205	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
206		s, err := c.inspect(ctx, state)
207		if err != nil {
208			return err
209		}
210		swarm = s
211		return nil
212	}); err != nil {
213		return types.Swarm{}, err
214	}
215	return swarm, nil
216}
217
218func (c *Cluster) inspect(ctx context.Context, state nodeState) (types.Swarm, error) {
219	s, err := getSwarm(ctx, state.controlClient)
220	if err != nil {
221		return types.Swarm{}, err
222	}
223	return convert.SwarmFromGRPC(*s), nil
224}
225
226// Update updates configuration of a managed swarm cluster.
227func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
228	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
229		swarm, err := getSwarm(ctx, state.controlClient)
230		if err != nil {
231			return err
232		}
233
234		// Validate spec name.
235		if spec.Annotations.Name == "" {
236			spec.Annotations.Name = "default"
237		} else if spec.Annotations.Name != "default" {
238			return errdefs.InvalidParameter(errors.New(`swarm spec must be named "default"`))
239		}
240
241		// In update, client should provide the complete spec of the swarm, including
242		// Name and Labels. If a field is specified with 0 or nil, then the default value
243		// will be used to swarmkit.
244		clusterSpec, err := convert.SwarmSpecToGRPC(spec)
245		if err != nil {
246			return errdefs.InvalidParameter(err)
247		}
248
249		_, err = state.controlClient.UpdateCluster(
250			ctx,
251			&swarmapi.UpdateClusterRequest{
252				ClusterID: swarm.ID,
253				Spec:      &clusterSpec,
254				ClusterVersion: &swarmapi.Version{
255					Index: version,
256				},
257				Rotation: swarmapi.KeyRotation{
258					WorkerJoinToken:  flags.RotateWorkerToken,
259					ManagerJoinToken: flags.RotateManagerToken,
260					ManagerUnlockKey: flags.RotateManagerUnlockKey,
261				},
262			},
263		)
264		return err
265	})
266}
267
268// GetUnlockKey returns the unlock key for the swarm.
269func (c *Cluster) GetUnlockKey() (string, error) {
270	var resp *swarmapi.GetUnlockKeyResponse
271	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
272		client := swarmapi.NewCAClient(state.grpcConn)
273
274		r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
275		if err != nil {
276			return err
277		}
278		resp = r
279		return nil
280	}); err != nil {
281		return "", err
282	}
283	if len(resp.UnlockKey) == 0 {
284		// no key
285		return "", nil
286	}
287	return encryption.HumanReadableKey(resp.UnlockKey), nil
288}
289
290// UnlockSwarm provides a key to decrypt data that is encrypted at rest.
291func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
292	c.controlMutex.Lock()
293	defer c.controlMutex.Unlock()
294
295	c.mu.RLock()
296	state := c.currentNodeState()
297
298	if !state.IsActiveManager() {
299		// when manager is not active,
300		// unless it is locked, otherwise return error.
301		if err := c.errNoManager(state); err != errSwarmLocked {
302			c.mu.RUnlock()
303			return err
304		}
305	} else {
306		// when manager is active, return an error of "not locked"
307		c.mu.RUnlock()
308		return notLockedError{}
309	}
310
311	// only when swarm is locked, code running reaches here
312	nr := c.nr
313	c.mu.RUnlock()
314
315	key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
316	if err != nil {
317		return errdefs.InvalidParameter(err)
318	}
319
320	config := nr.config
321	config.lockKey = key
322	if err := nr.Stop(); err != nil {
323		return err
324	}
325	nr, err = c.newNodeRunner(config)
326	if err != nil {
327		return err
328	}
329
330	c.mu.Lock()
331	c.nr = nr
332	c.mu.Unlock()
333
334	if err := <-nr.Ready(); err != nil {
335		if errors.Cause(err) == errSwarmLocked {
336			return invalidUnlockKey{}
337		}
338		return errors.Errorf("swarm component could not be started: %v", err)
339	}
340	return nil
341}
342
343// Leave shuts down Cluster and removes current state.
344func (c *Cluster) Leave(force bool) error {
345	c.controlMutex.Lock()
346	defer c.controlMutex.Unlock()
347
348	c.mu.Lock()
349	nr := c.nr
350	if nr == nil {
351		c.mu.Unlock()
352		return errors.WithStack(errNoSwarm)
353	}
354
355	state := c.currentNodeState()
356
357	c.mu.Unlock()
358
359	if errors.Cause(state.err) == errSwarmLocked && !force {
360		// leave a locked swarm without --force is not allowed
361		return errors.WithStack(notAvailableError("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message."))
362	}
363
364	if state.IsManager() && !force {
365		msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
366		if state.IsActiveManager() {
367			active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
368			if err == nil {
369				if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
370					if isLastManager(reachable, unreachable) {
371						msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
372						return errors.WithStack(notAvailableError(msg))
373					}
374					msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
375				}
376			}
377		} else {
378			msg += "Doing so may lose the consensus of your cluster. "
379		}
380
381		msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
382		return errors.WithStack(notAvailableError(msg))
383	}
384	// release readers in here
385	if err := nr.Stop(); err != nil {
386		logrus.Errorf("failed to shut down cluster node: %v", err)
387		signal.DumpStacks("")
388		return err
389	}
390
391	c.mu.Lock()
392	c.nr = nil
393	c.mu.Unlock()
394
395	if nodeID := state.NodeID(); nodeID != "" {
396		nodeContainers, err := c.listContainerForNode(nodeID)
397		if err != nil {
398			return err
399		}
400		for _, id := range nodeContainers {
401			if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
402				logrus.Errorf("error removing %v: %v", id, err)
403			}
404		}
405	}
406
407	// todo: cleanup optional?
408	if err := clearPersistentState(c.root); err != nil {
409		return err
410	}
411	c.config.Backend.DaemonLeavesCluster()
412	return nil
413}
414
415// Info returns information about the current cluster state.
416func (c *Cluster) Info() types.Info {
417	info := types.Info{
418		NodeAddr: c.GetAdvertiseAddress(),
419	}
420	c.mu.RLock()
421	defer c.mu.RUnlock()
422
423	state := c.currentNodeState()
424	info.LocalNodeState = state.status
425	if state.err != nil {
426		info.Error = state.err.Error()
427	}
428
429	ctx, cancel := c.getRequestContext()
430	defer cancel()
431
432	if state.IsActiveManager() {
433		info.ControlAvailable = true
434		swarm, err := c.inspect(ctx, state)
435		if err != nil {
436			info.Error = err.Error()
437		}
438
439		info.Cluster = &swarm.ClusterInfo
440
441		if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err != nil {
442			info.Error = err.Error()
443		} else {
444			info.Nodes = len(r.Nodes)
445			for _, n := range r.Nodes {
446				if n.ManagerStatus != nil {
447					info.Managers = info.Managers + 1
448				}
449			}
450		}
451	}
452
453	if state.swarmNode != nil {
454		for _, r := range state.swarmNode.Remotes() {
455			info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
456		}
457		info.NodeID = state.swarmNode.NodeID()
458	}
459
460	return info
461}
462
463func validateAndSanitizeInitRequest(req *types.InitRequest) error {
464	var err error
465	req.ListenAddr, err = validateAddr(req.ListenAddr)
466	if err != nil {
467		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
468	}
469
470	if req.Spec.Annotations.Name == "" {
471		req.Spec.Annotations.Name = "default"
472	} else if req.Spec.Annotations.Name != "default" {
473		return errors.New(`swarm spec must be named "default"`)
474	}
475
476	return nil
477}
478
479func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
480	var err error
481	req.ListenAddr, err = validateAddr(req.ListenAddr)
482	if err != nil {
483		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
484	}
485	if len(req.RemoteAddrs) == 0 {
486		return errors.New("at least 1 RemoteAddr is required to join")
487	}
488	for i := range req.RemoteAddrs {
489		req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
490		if err != nil {
491			return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
492		}
493	}
494	return nil
495}
496
497func validateAddr(addr string) (string, error) {
498	if addr == "" {
499		return addr, errors.New("invalid empty address")
500	}
501	newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
502	if err != nil {
503		return addr, nil
504	}
505	return strings.TrimPrefix(newaddr, "tcp://"), nil
506}
507
508func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
509	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
510	defer cancel()
511	for conn := range node.ListenControlSocket(ctx) {
512		if ctx.Err() != nil {
513			return ctx.Err()
514		}
515		if conn != nil {
516			client := swarmapi.NewControlClient(conn)
517			var cluster *swarmapi.Cluster
518			for i := 0; ; i++ {
519				lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
520				if err != nil {
521					return fmt.Errorf("error on listing clusters: %v", err)
522				}
523				if len(lcr.Clusters) == 0 {
524					if i < 10 {
525						time.Sleep(200 * time.Millisecond)
526						continue
527					}
528					return errors.New("empty list of clusters was returned")
529				}
530				cluster = lcr.Clusters[0]
531				break
532			}
533			// In init, we take the initial default values from swarmkit, and merge
534			// any non nil or 0 value from spec to GRPC spec. This will leave the
535			// default value alone.
536			// Note that this is different from Update(), as in Update() we expect
537			// user to specify the complete spec of the cluster (as they already know
538			// the existing one and knows which field to update)
539			clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
540			if err != nil {
541				return fmt.Errorf("error updating cluster settings: %v", err)
542			}
543			_, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
544				ClusterID:      cluster.ID,
545				ClusterVersion: &cluster.Meta.Version,
546				Spec:           &clusterSpec,
547			})
548			if err != nil {
549				return fmt.Errorf("error updating cluster settings: %v", err)
550			}
551			return nil
552		}
553	}
554	return ctx.Err()
555}
556
557func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
558	var ids []string
559	filters := filters.NewArgs()
560	filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
561	containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
562		Filters: filters,
563	})
564	if err != nil {
565		return []string{}, err
566	}
567	for _, c := range containers {
568		ids = append(ids, c.ID)
569	}
570	return ids, nil
571}
572