1package etcdv3 2 3import ( 4 "context" 5 "crypto/tls" 6 "errors" 7 "time" 8 9 "go.etcd.io/etcd/client/pkg/v3/transport" 10 clientv3 "go.etcd.io/etcd/client/v3" 11 "google.golang.org/grpc" 12) 13 14var ( 15 // ErrNoKey indicates a client method needs a key but receives none. 16 ErrNoKey = errors.New("no key provided") 17 18 // ErrNoValue indicates a client method needs a value but receives none. 19 ErrNoValue = errors.New("no value provided") 20) 21 22// Client is a wrapper around the etcd client. 23type Client interface { 24 // GetEntries queries the given prefix in etcd and returns a slice 25 // containing the values of all keys found, recursively, underneath that 26 // prefix. 27 GetEntries(prefix string) ([]string, error) 28 29 // WatchPrefix watches the given prefix in etcd for changes. When a change 30 // is detected, it will signal on the passed channel. Clients are expected 31 // to call GetEntries to update themselves with the latest set of complete 32 // values. WatchPrefix will always send an initial sentinel value on the 33 // channel after establishing the watch, to ensure that clients always 34 // receive the latest set of values. WatchPrefix will block until the 35 // context passed to the NewClient constructor is terminated. 36 WatchPrefix(prefix string, ch chan struct{}) 37 38 // Register a service with etcd. 39 Register(s Service) error 40 41 // Deregister a service with etcd. 42 Deregister(s Service) error 43 44 // LeaseID returns the lease id created for this service instance 45 LeaseID() int64 46} 47 48type client struct { 49 cli *clientv3.Client 50 ctx context.Context 51 52 kv clientv3.KV 53 54 // Watcher interface instance, used to leverage Watcher.Close() 55 watcher clientv3.Watcher 56 // watcher context 57 wctx context.Context 58 // watcher cancel func 59 wcf context.CancelFunc 60 61 // leaseID will be 0 (clientv3.NoLease) if a lease was not created 62 leaseID clientv3.LeaseID 63 64 hbch <-chan *clientv3.LeaseKeepAliveResponse 65 // Lease interface instance, used to leverage Lease.Close() 66 leaser clientv3.Lease 67} 68 69// ClientOptions defines options for the etcd client. All values are optional. 70// If any duration is not specified, a default of 3 seconds will be used. 71type ClientOptions struct { 72 Cert string 73 Key string 74 CACert string 75 DialTimeout time.Duration 76 DialKeepAlive time.Duration 77 78 // DialOptions is a list of dial options for the gRPC client (e.g., for interceptors). 79 // For example, pass grpc.WithBlock() to block until the underlying connection is up. 80 // Without this, Dial returns immediately and connecting the server happens in background. 81 DialOptions []grpc.DialOption 82 83 Username string 84 Password string 85} 86 87// NewClient returns Client with a connection to the named machines. It will 88// return an error if a connection to the cluster cannot be made. 89func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) { 90 if options.DialTimeout == 0 { 91 options.DialTimeout = 3 * time.Second 92 } 93 if options.DialKeepAlive == 0 { 94 options.DialKeepAlive = 3 * time.Second 95 } 96 97 var err error 98 var tlscfg *tls.Config 99 100 if options.Cert != "" && options.Key != "" { 101 tlsInfo := transport.TLSInfo{ 102 CertFile: options.Cert, 103 KeyFile: options.Key, 104 TrustedCAFile: options.CACert, 105 } 106 tlscfg, err = tlsInfo.ClientConfig() 107 if err != nil { 108 return nil, err 109 } 110 } 111 112 cli, err := clientv3.New(clientv3.Config{ 113 Context: ctx, 114 Endpoints: machines, 115 DialTimeout: options.DialTimeout, 116 DialKeepAliveTime: options.DialKeepAlive, 117 DialOptions: options.DialOptions, 118 TLS: tlscfg, 119 Username: options.Username, 120 Password: options.Password, 121 }) 122 if err != nil { 123 return nil, err 124 } 125 126 return &client{ 127 cli: cli, 128 ctx: ctx, 129 kv: clientv3.NewKV(cli), 130 }, nil 131} 132 133func (c *client) LeaseID() int64 { return int64(c.leaseID) } 134 135// GetEntries implements the etcd Client interface. 136func (c *client) GetEntries(key string) ([]string, error) { 137 resp, err := c.kv.Get(c.ctx, key, clientv3.WithPrefix()) 138 if err != nil { 139 return nil, err 140 } 141 142 entries := make([]string, len(resp.Kvs)) 143 for i, kv := range resp.Kvs { 144 entries[i] = string(kv.Value) 145 } 146 147 return entries, nil 148} 149 150// WatchPrefix implements the etcd Client interface. 151func (c *client) WatchPrefix(prefix string, ch chan struct{}) { 152 c.wctx, c.wcf = context.WithCancel(c.ctx) 153 c.watcher = clientv3.NewWatcher(c.cli) 154 155 wch := c.watcher.Watch(c.wctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(0)) 156 ch <- struct{}{} 157 for wr := range wch { 158 if wr.Canceled { 159 return 160 } 161 ch <- struct{}{} 162 } 163} 164 165func (c *client) Register(s Service) error { 166 var err error 167 168 if s.Key == "" { 169 return ErrNoKey 170 } 171 if s.Value == "" { 172 return ErrNoValue 173 } 174 175 if c.leaser != nil { 176 c.leaser.Close() 177 } 178 c.leaser = clientv3.NewLease(c.cli) 179 180 if c.watcher != nil { 181 c.watcher.Close() 182 } 183 c.watcher = clientv3.NewWatcher(c.cli) 184 if c.kv == nil { 185 c.kv = clientv3.NewKV(c.cli) 186 } 187 188 if s.TTL == nil { 189 s.TTL = NewTTLOption(time.Second*3, time.Second*10) 190 } 191 192 grantResp, err := c.leaser.Grant(c.ctx, int64(s.TTL.ttl.Seconds())) 193 if err != nil { 194 return err 195 } 196 c.leaseID = grantResp.ID 197 198 _, err = c.kv.Put( 199 c.ctx, 200 s.Key, 201 s.Value, 202 clientv3.WithLease(c.leaseID), 203 ) 204 if err != nil { 205 return err 206 } 207 208 // this will keep the key alive 'forever' or until we revoke it or 209 // the context is canceled 210 c.hbch, err = c.leaser.KeepAlive(c.ctx, c.leaseID) 211 if err != nil { 212 return err 213 } 214 215 // discard the keepalive response, make etcd library not to complain 216 // fix bug #799 217 go func() { 218 for { 219 select { 220 case r := <-c.hbch: 221 // avoid dead loop when channel was closed 222 if r == nil { 223 return 224 } 225 case <-c.ctx.Done(): 226 return 227 } 228 } 229 }() 230 231 return nil 232} 233 234func (c *client) Deregister(s Service) error { 235 defer c.close() 236 237 if s.Key == "" { 238 return ErrNoKey 239 } 240 if _, err := c.cli.Delete(c.ctx, s.Key, clientv3.WithIgnoreLease()); err != nil { 241 return err 242 } 243 244 return nil 245} 246 247// close will close any open clients and call 248// the watcher cancel func 249func (c *client) close() { 250 if c.leaser != nil { 251 c.leaser.Close() 252 } 253 if c.watcher != nil { 254 c.watcher.Close() 255 } 256 if c.wcf != nil { 257 c.wcf() 258 } 259} 260