1// Copyright 2016 CoreOS, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package clientv3 16 17import ( 18 "sync" 19 20 "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" 21 "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" 22 pb "github.com/coreos/etcd/etcdserver/etcdserverpb" 23) 24 25type ( 26 Member pb.Member 27 MemberListResponse pb.MemberListResponse 28 MemberAddResponse pb.MemberAddResponse 29 MemberRemoveResponse pb.MemberRemoveResponse 30 MemberUpdateResponse pb.MemberUpdateResponse 31) 32 33type Cluster interface { 34 // MemberList lists the current cluster membership. 35 MemberList(ctx context.Context) (*MemberListResponse, error) 36 37 // MemberLeader returns the current leader member. 38 MemberLeader(ctx context.Context) (*Member, error) 39 40 // MemberAdd adds a new member into the cluster. 41 MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) 42 43 // MemberRemove removes an existing member from the cluster. 44 MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) 45 46 // MemberUpdate updates the peer addresses of the member. 47 MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) 48} 49 50type cluster struct { 51 c *Client 52 53 mu sync.Mutex 54 conn *grpc.ClientConn // conn in-use 55 remote pb.ClusterClient 56} 57 58func NewCluster(c *Client) Cluster { 59 conn := c.ActiveConnection() 60 61 return &cluster{ 62 c: c, 63 64 conn: conn, 65 remote: pb.NewClusterClient(conn), 66 } 67} 68 69func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) { 70 r := &pb.MemberAddRequest{PeerURLs: peerAddrs} 71 resp, err := c.getRemote().MemberAdd(ctx, r) 72 if err == nil { 73 return (*MemberAddResponse)(resp), nil 74 } 75 76 if isHalted(ctx, err) { 77 return nil, err 78 } 79 80 go c.switchRemote(err) 81 return nil, err 82} 83 84func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) { 85 r := &pb.MemberRemoveRequest{ID: id} 86 resp, err := c.getRemote().MemberRemove(ctx, r) 87 if err == nil { 88 return (*MemberRemoveResponse)(resp), nil 89 } 90 91 if isHalted(ctx, err) { 92 return nil, err 93 } 94 95 go c.switchRemote(err) 96 return nil, err 97} 98 99func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) { 100 // it is safe to retry on update. 101 for { 102 r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} 103 resp, err := c.getRemote().MemberUpdate(ctx, r) 104 if err == nil { 105 return (*MemberUpdateResponse)(resp), nil 106 } 107 108 if isHalted(ctx, err) { 109 return nil, err 110 } 111 112 err = c.switchRemote(err) 113 if err != nil { 114 return nil, err 115 } 116 } 117} 118 119func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { 120 // it is safe to retry on list. 121 for { 122 resp, err := c.getRemote().MemberList(ctx, &pb.MemberListRequest{}) 123 if err == nil { 124 return (*MemberListResponse)(resp), nil 125 } 126 127 if isHalted(ctx, err) { 128 return nil, err 129 } 130 131 err = c.switchRemote(err) 132 if err != nil { 133 return nil, err 134 } 135 } 136} 137 138func (c *cluster) MemberLeader(ctx context.Context) (*Member, error) { 139 resp, err := c.MemberList(ctx) 140 if err != nil { 141 return nil, err 142 } 143 for _, m := range resp.Members { 144 if m.IsLeader { 145 return (*Member)(m), nil 146 } 147 } 148 return nil, nil 149} 150 151func (c *cluster) getRemote() pb.ClusterClient { 152 c.mu.Lock() 153 defer c.mu.Unlock() 154 155 return c.remote 156} 157 158func (c *cluster) switchRemote(prevErr error) error { 159 newConn, err := c.c.retryConnection(c.conn, prevErr) 160 if err != nil { 161 return err 162 } 163 164 c.mu.Lock() 165 defer c.mu.Unlock() 166 167 c.conn = newConn 168 c.remote = pb.NewClusterClient(c.conn) 169 return nil 170} 171