1// Copyright 2016 CoreOS, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18	"sync"
19
20	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
21	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
22	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
23)
24
25type (
26	Member               pb.Member
27	MemberListResponse   pb.MemberListResponse
28	MemberAddResponse    pb.MemberAddResponse
29	MemberRemoveResponse pb.MemberRemoveResponse
30	MemberUpdateResponse pb.MemberUpdateResponse
31)
32
33type Cluster interface {
34	// MemberList lists the current cluster membership.
35	MemberList(ctx context.Context) (*MemberListResponse, error)
36
37	// MemberLeader returns the current leader member.
38	MemberLeader(ctx context.Context) (*Member, error)
39
40	// MemberAdd adds a new member into the cluster.
41	MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
42
43	// MemberRemove removes an existing member from the cluster.
44	MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)
45
46	// MemberUpdate updates the peer addresses of the member.
47	MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error)
48}
49
50type cluster struct {
51	c *Client
52
53	mu     sync.Mutex
54	conn   *grpc.ClientConn // conn in-use
55	remote pb.ClusterClient
56}
57
58func NewCluster(c *Client) Cluster {
59	conn := c.ActiveConnection()
60
61	return &cluster{
62		c: c,
63
64		conn:   conn,
65		remote: pb.NewClusterClient(conn),
66	}
67}
68
69func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
70	r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
71	resp, err := c.getRemote().MemberAdd(ctx, r)
72	if err == nil {
73		return (*MemberAddResponse)(resp), nil
74	}
75
76	if isHalted(ctx, err) {
77		return nil, err
78	}
79
80	go c.switchRemote(err)
81	return nil, err
82}
83
84func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
85	r := &pb.MemberRemoveRequest{ID: id}
86	resp, err := c.getRemote().MemberRemove(ctx, r)
87	if err == nil {
88		return (*MemberRemoveResponse)(resp), nil
89	}
90
91	if isHalted(ctx, err) {
92		return nil, err
93	}
94
95	go c.switchRemote(err)
96	return nil, err
97}
98
99func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
100	// it is safe to retry on update.
101	for {
102		r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
103		resp, err := c.getRemote().MemberUpdate(ctx, r)
104		if err == nil {
105			return (*MemberUpdateResponse)(resp), nil
106		}
107
108		if isHalted(ctx, err) {
109			return nil, err
110		}
111
112		err = c.switchRemote(err)
113		if err != nil {
114			return nil, err
115		}
116	}
117}
118
119func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
120	// it is safe to retry on list.
121	for {
122		resp, err := c.getRemote().MemberList(ctx, &pb.MemberListRequest{})
123		if err == nil {
124			return (*MemberListResponse)(resp), nil
125		}
126
127		if isHalted(ctx, err) {
128			return nil, err
129		}
130
131		err = c.switchRemote(err)
132		if err != nil {
133			return nil, err
134		}
135	}
136}
137
138func (c *cluster) MemberLeader(ctx context.Context) (*Member, error) {
139	resp, err := c.MemberList(ctx)
140	if err != nil {
141		return nil, err
142	}
143	for _, m := range resp.Members {
144		if m.IsLeader {
145			return (*Member)(m), nil
146		}
147	}
148	return nil, nil
149}
150
151func (c *cluster) getRemote() pb.ClusterClient {
152	c.mu.Lock()
153	defer c.mu.Unlock()
154
155	return c.remote
156}
157
158func (c *cluster) switchRemote(prevErr error) error {
159	newConn, err := c.c.retryConnection(c.conn, prevErr)
160	if err != nil {
161		return err
162	}
163
164	c.mu.Lock()
165	defer c.mu.Unlock()
166
167	c.conn = newConn
168	c.remote = pb.NewClusterClient(c.conn)
169	return nil
170}
171