1package etcdv3
2
3import (
4	"context"
5	"crypto/tls"
6	"errors"
7	"time"
8
9	"go.etcd.io/etcd/client/pkg/v3/transport"
10	clientv3 "go.etcd.io/etcd/client/v3"
11	"google.golang.org/grpc"
12)
13
14var (
15	// ErrNoKey indicates a client method needs a key but receives none.
16	ErrNoKey = errors.New("no key provided")
17
18	// ErrNoValue indicates a client method needs a value but receives none.
19	ErrNoValue = errors.New("no value provided")
20)
21
22// Client is a wrapper around the etcd client.
23type Client interface {
24	// GetEntries queries the given prefix in etcd and returns a slice
25	// containing the values of all keys found, recursively, underneath that
26	// prefix.
27	GetEntries(prefix string) ([]string, error)
28
29	// WatchPrefix watches the given prefix in etcd for changes. When a change
30	// is detected, it will signal on the passed channel. Clients are expected
31	// to call GetEntries to update themselves with the latest set of complete
32	// values. WatchPrefix will always send an initial sentinel value on the
33	// channel after establishing the watch, to ensure that clients always
34	// receive the latest set of values. WatchPrefix will block until the
35	// context passed to the NewClient constructor is terminated.
36	WatchPrefix(prefix string, ch chan struct{})
37
38	// Register a service with etcd.
39	Register(s Service) error
40
41	// Deregister a service with etcd.
42	Deregister(s Service) error
43
44	// LeaseID returns the lease id created for this service instance
45	LeaseID() int64
46}
47
48type client struct {
49	cli *clientv3.Client
50	ctx context.Context
51
52	kv clientv3.KV
53
54	// Watcher interface instance, used to leverage Watcher.Close()
55	watcher clientv3.Watcher
56	// watcher context
57	wctx context.Context
58	// watcher cancel func
59	wcf context.CancelFunc
60
61	// leaseID will be 0 (clientv3.NoLease) if a lease was not created
62	leaseID clientv3.LeaseID
63
64	hbch <-chan *clientv3.LeaseKeepAliveResponse
65	// Lease interface instance, used to leverage Lease.Close()
66	leaser clientv3.Lease
67}
68
69// ClientOptions defines options for the etcd client. All values are optional.
70// If any duration is not specified, a default of 3 seconds will be used.
71type ClientOptions struct {
72	Cert          string
73	Key           string
74	CACert        string
75	DialTimeout   time.Duration
76	DialKeepAlive time.Duration
77
78	// DialOptions is a list of dial options for the gRPC client (e.g., for interceptors).
79	// For example, pass grpc.WithBlock() to block until the underlying connection is up.
80	// Without this, Dial returns immediately and connecting the server happens in background.
81	DialOptions []grpc.DialOption
82
83	Username string
84	Password string
85}
86
87// NewClient returns Client with a connection to the named machines. It will
88// return an error if a connection to the cluster cannot be made.
89func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
90	if options.DialTimeout == 0 {
91		options.DialTimeout = 3 * time.Second
92	}
93	if options.DialKeepAlive == 0 {
94		options.DialKeepAlive = 3 * time.Second
95	}
96
97	var err error
98	var tlscfg *tls.Config
99
100	if options.Cert != "" && options.Key != "" {
101		tlsInfo := transport.TLSInfo{
102			CertFile:      options.Cert,
103			KeyFile:       options.Key,
104			TrustedCAFile: options.CACert,
105		}
106		tlscfg, err = tlsInfo.ClientConfig()
107		if err != nil {
108			return nil, err
109		}
110	}
111
112	cli, err := clientv3.New(clientv3.Config{
113		Context:           ctx,
114		Endpoints:         machines,
115		DialTimeout:       options.DialTimeout,
116		DialKeepAliveTime: options.DialKeepAlive,
117		DialOptions:       options.DialOptions,
118		TLS:               tlscfg,
119		Username:          options.Username,
120		Password:          options.Password,
121	})
122	if err != nil {
123		return nil, err
124	}
125
126	return &client{
127		cli: cli,
128		ctx: ctx,
129		kv:  clientv3.NewKV(cli),
130	}, nil
131}
132
133func (c *client) LeaseID() int64 { return int64(c.leaseID) }
134
135// GetEntries implements the etcd Client interface.
136func (c *client) GetEntries(key string) ([]string, error) {
137	resp, err := c.kv.Get(c.ctx, key, clientv3.WithPrefix())
138	if err != nil {
139		return nil, err
140	}
141
142	entries := make([]string, len(resp.Kvs))
143	for i, kv := range resp.Kvs {
144		entries[i] = string(kv.Value)
145	}
146
147	return entries, nil
148}
149
150// WatchPrefix implements the etcd Client interface.
151func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
152	c.wctx, c.wcf = context.WithCancel(c.ctx)
153	c.watcher = clientv3.NewWatcher(c.cli)
154
155	wch := c.watcher.Watch(c.wctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(0))
156	ch <- struct{}{}
157	for wr := range wch {
158		if wr.Canceled {
159			return
160		}
161		ch <- struct{}{}
162	}
163}
164
165func (c *client) Register(s Service) error {
166	var err error
167
168	if s.Key == "" {
169		return ErrNoKey
170	}
171	if s.Value == "" {
172		return ErrNoValue
173	}
174
175	if c.leaser != nil {
176		c.leaser.Close()
177	}
178	c.leaser = clientv3.NewLease(c.cli)
179
180	if c.watcher != nil {
181		c.watcher.Close()
182	}
183	c.watcher = clientv3.NewWatcher(c.cli)
184	if c.kv == nil {
185		c.kv = clientv3.NewKV(c.cli)
186	}
187
188	if s.TTL == nil {
189		s.TTL = NewTTLOption(time.Second*3, time.Second*10)
190	}
191
192	grantResp, err := c.leaser.Grant(c.ctx, int64(s.TTL.ttl.Seconds()))
193	if err != nil {
194		return err
195	}
196	c.leaseID = grantResp.ID
197
198	_, err = c.kv.Put(
199		c.ctx,
200		s.Key,
201		s.Value,
202		clientv3.WithLease(c.leaseID),
203	)
204	if err != nil {
205		return err
206	}
207
208	// this will keep the key alive 'forever' or until we revoke it or
209	// the context is canceled
210	c.hbch, err = c.leaser.KeepAlive(c.ctx, c.leaseID)
211	if err != nil {
212		return err
213	}
214
215	// discard the keepalive response, make etcd library not to complain
216	// fix bug #799
217	go func() {
218		for {
219			select {
220			case r := <-c.hbch:
221				// avoid dead loop when channel was closed
222				if r == nil {
223					return
224				}
225			case <-c.ctx.Done():
226				return
227			}
228		}
229	}()
230
231	return nil
232}
233
234func (c *client) Deregister(s Service) error {
235	defer c.close()
236
237	if s.Key == "" {
238		return ErrNoKey
239	}
240	if _, err := c.cli.Delete(c.ctx, s.Key, clientv3.WithIgnoreLease()); err != nil {
241		return err
242	}
243
244	return nil
245}
246
247// close will close any open clients and call
248// the watcher cancel func
249func (c *client) close() {
250	if c.leaser != nil {
251		c.leaser.Close()
252	}
253	if c.watcher != nil {
254		c.watcher.Close()
255	}
256	if c.wcf != nil {
257		c.wcf()
258	}
259}
260