1// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rpcpb
16
17import (
18	"context"
19	"crypto/tls"
20	"fmt"
21	"net/url"
22	"os"
23	"time"
24
25	"go.etcd.io/etcd/clientv3"
26	"go.etcd.io/etcd/clientv3/snapshot"
27	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
28	"go.etcd.io/etcd/pkg/logutil"
29	"go.etcd.io/etcd/pkg/transport"
30
31	"github.com/dustin/go-humanize"
32	"go.uber.org/zap"
33	grpc "google.golang.org/grpc"
34	"google.golang.org/grpc/credentials"
35)
36
37// ElectionTimeout returns an election timeout duration.
38func (m *Member) ElectionTimeout() time.Duration {
39	return time.Duration(m.Etcd.ElectionTimeoutMs) * time.Millisecond
40}
41
42// DialEtcdGRPCServer creates a raw gRPC connection to an etcd member.
43func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn, error) {
44	dialOpts := []grpc.DialOption{
45		grpc.WithTimeout(5 * time.Second),
46		grpc.WithBlock(),
47	}
48
49	secure := false
50	for _, cu := range m.Etcd.AdvertiseClientURLs {
51		u, err := url.Parse(cu)
52		if err != nil {
53			return nil, err
54		}
55		if u.Scheme == "https" { // TODO: handle unix
56			secure = true
57		}
58	}
59
60	if secure {
61		// assume save TLS assets are already stord on disk
62		tlsInfo := transport.TLSInfo{
63			CertFile:      m.ClientCertPath,
64			KeyFile:       m.ClientKeyPath,
65			TrustedCAFile: m.ClientTrustedCAPath,
66
67			// TODO: remove this with generated certs
68			// only need it for auto TLS
69			InsecureSkipVerify: true,
70		}
71		tlsConfig, err := tlsInfo.ClientConfig()
72		if err != nil {
73			return nil, err
74		}
75		creds := credentials.NewTLS(tlsConfig)
76		dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
77	} else {
78		dialOpts = append(dialOpts, grpc.WithInsecure())
79	}
80	dialOpts = append(dialOpts, opts...)
81	return grpc.Dial(m.EtcdClientEndpoint, dialOpts...)
82}
83
84// CreateEtcdClientConfig creates a client configuration from member.
85func (m *Member) CreateEtcdClientConfig(opts ...grpc.DialOption) (cfg *clientv3.Config, err error) {
86	secure := false
87	for _, cu := range m.Etcd.AdvertiseClientURLs {
88		var u *url.URL
89		u, err = url.Parse(cu)
90		if err != nil {
91			return nil, err
92		}
93		if u.Scheme == "https" { // TODO: handle unix
94			secure = true
95		}
96	}
97
98	// TODO: make this configurable
99	level := "error"
100	if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
101		level = "debug"
102	}
103	lcfg := logutil.DefaultZapLoggerConfig
104	lcfg.Level = zap.NewAtomicLevelAt(logutil.ConvertToZapLevel(level))
105
106	cfg = &clientv3.Config{
107		Endpoints:   []string{m.EtcdClientEndpoint},
108		DialTimeout: 10 * time.Second,
109		DialOptions: opts,
110		LogConfig:   &lcfg,
111	}
112	if secure {
113		// assume save TLS assets are already stord on disk
114		tlsInfo := transport.TLSInfo{
115			CertFile:      m.ClientCertPath,
116			KeyFile:       m.ClientKeyPath,
117			TrustedCAFile: m.ClientTrustedCAPath,
118
119			// TODO: remove this with generated certs
120			// only need it for auto TLS
121			InsecureSkipVerify: true,
122		}
123		var tlsConfig *tls.Config
124		tlsConfig, err = tlsInfo.ClientConfig()
125		if err != nil {
126			return nil, err
127		}
128		cfg.TLS = tlsConfig
129	}
130	return cfg, err
131}
132
133// CreateEtcdClient creates a client from member.
134func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, error) {
135	cfg, err := m.CreateEtcdClientConfig(opts...)
136	if err != nil {
137		return nil, err
138	}
139	return clientv3.New(*cfg)
140}
141
142// CheckCompact ensures that historical data before given revision has been compacted.
143func (m *Member) CheckCompact(rev int64) error {
144	cli, err := m.CreateEtcdClient()
145	if err != nil {
146		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
147	}
148	defer cli.Close()
149
150	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
151	wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
152	wr, ok := <-wch
153	cancel()
154
155	if !ok {
156		return fmt.Errorf("watch channel terminated (endpoint %q)", m.EtcdClientEndpoint)
157	}
158	if wr.CompactRevision != rev {
159		return fmt.Errorf("got compact revision %v, wanted %v (endpoint %q)", wr.CompactRevision, rev, m.EtcdClientEndpoint)
160	}
161
162	return nil
163}
164
165// Defrag runs defragmentation on this member.
166func (m *Member) Defrag() error {
167	cli, err := m.CreateEtcdClient()
168	if err != nil {
169		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
170	}
171	defer cli.Close()
172
173	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
174	_, err = cli.Defragment(ctx, m.EtcdClientEndpoint)
175	cancel()
176	return err
177}
178
179// RevHash fetches current revision and hash on this member.
180func (m *Member) RevHash() (int64, int64, error) {
181	conn, err := m.DialEtcdGRPCServer()
182	if err != nil {
183		return 0, 0, err
184	}
185	defer conn.Close()
186
187	mt := pb.NewMaintenanceClient(conn)
188	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
189	resp, err := mt.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false))
190	cancel()
191
192	if err != nil {
193		return 0, 0, err
194	}
195
196	return resp.Header.Revision, int64(resp.Hash), nil
197}
198
199// Rev fetches current revision on this member.
200func (m *Member) Rev(ctx context.Context) (int64, error) {
201	cli, err := m.CreateEtcdClient()
202	if err != nil {
203		return 0, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
204	}
205	defer cli.Close()
206
207	resp, err := cli.Status(ctx, m.EtcdClientEndpoint)
208	if err != nil {
209		return 0, err
210	}
211	return resp.Header.Revision, nil
212}
213
214// Compact compacts member storage with given revision.
215// It blocks until it's physically done.
216func (m *Member) Compact(rev int64, timeout time.Duration) error {
217	cli, err := m.CreateEtcdClient()
218	if err != nil {
219		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
220	}
221	defer cli.Close()
222
223	ctx, cancel := context.WithTimeout(context.Background(), timeout)
224	_, err = cli.Compact(ctx, rev, clientv3.WithCompactPhysical())
225	cancel()
226	return err
227}
228
229// IsLeader returns true if this member is the current cluster leader.
230func (m *Member) IsLeader() (bool, error) {
231	cli, err := m.CreateEtcdClient()
232	if err != nil {
233		return false, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
234	}
235	defer cli.Close()
236
237	resp, err := cli.Status(context.Background(), m.EtcdClientEndpoint)
238	if err != nil {
239		return false, err
240	}
241	return resp.Header.MemberId == resp.Leader, nil
242}
243
244// WriteHealthKey writes a health key to this member.
245func (m *Member) WriteHealthKey() error {
246	cli, err := m.CreateEtcdClient()
247	if err != nil {
248		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
249	}
250	defer cli.Close()
251
252	// give enough time-out in case expensive requests (range/delete) are pending
253	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
254	_, err = cli.Put(ctx, "health", "good")
255	cancel()
256	if err != nil {
257		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
258	}
259	return nil
260}
261
262// SaveSnapshot downloads a snapshot file from this member, locally.
263// It's meant to requested remotely, so that local member can store
264// snapshot file on local disk.
265func (m *Member) SaveSnapshot(lg *zap.Logger) (err error) {
266	// remove existing snapshot first
267	if err = os.RemoveAll(m.SnapshotPath); err != nil {
268		return err
269	}
270
271	var ccfg *clientv3.Config
272	ccfg, err = m.CreateEtcdClientConfig()
273	if err != nil {
274		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
275	}
276
277	lg.Info(
278		"snapshot save START",
279		zap.String("member-name", m.Etcd.Name),
280		zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs),
281		zap.String("snapshot-path", m.SnapshotPath),
282	)
283	now := time.Now()
284	mgr := snapshot.NewV3(lg)
285	if err = mgr.Save(context.Background(), *ccfg, m.SnapshotPath); err != nil {
286		return err
287	}
288	took := time.Since(now)
289
290	var fi os.FileInfo
291	fi, err = os.Stat(m.SnapshotPath)
292	if err != nil {
293		return err
294	}
295	var st snapshot.Status
296	st, err = mgr.Status(m.SnapshotPath)
297	if err != nil {
298		return err
299	}
300	m.SnapshotInfo = &SnapshotInfo{
301		MemberName:        m.Etcd.Name,
302		MemberClientURLs:  m.Etcd.AdvertiseClientURLs,
303		SnapshotPath:      m.SnapshotPath,
304		SnapshotFileSize:  humanize.Bytes(uint64(fi.Size())),
305		SnapshotTotalSize: humanize.Bytes(uint64(st.TotalSize)),
306		SnapshotTotalKey:  int64(st.TotalKey),
307		SnapshotHash:      int64(st.Hash),
308		SnapshotRevision:  st.Revision,
309		Took:              fmt.Sprintf("%v", took),
310	}
311	lg.Info(
312		"snapshot save END",
313		zap.String("member-name", m.SnapshotInfo.MemberName),
314		zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
315		zap.String("snapshot-path", m.SnapshotPath),
316		zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
317		zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize),
318		zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey),
319		zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash),
320		zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision),
321		zap.String("took", m.SnapshotInfo.Took),
322	)
323	return nil
324}
325
326// RestoreSnapshot restores a cluster from a given snapshot file on disk.
327// It's meant to requested remotely, so that local member can load the
328// snapshot file from local disk.
329func (m *Member) RestoreSnapshot(lg *zap.Logger) (err error) {
330	if err = os.RemoveAll(m.EtcdOnSnapshotRestore.DataDir); err != nil {
331		return err
332	}
333	if err = os.RemoveAll(m.EtcdOnSnapshotRestore.WALDir); err != nil {
334		return err
335	}
336
337	lg.Info(
338		"snapshot restore START",
339		zap.String("member-name", m.Etcd.Name),
340		zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs),
341		zap.String("snapshot-path", m.SnapshotPath),
342	)
343	now := time.Now()
344	mgr := snapshot.NewV3(lg)
345	err = mgr.Restore(snapshot.RestoreConfig{
346		SnapshotPath:        m.SnapshotInfo.SnapshotPath,
347		Name:                m.EtcdOnSnapshotRestore.Name,
348		OutputDataDir:       m.EtcdOnSnapshotRestore.DataDir,
349		OutputWALDir:        m.EtcdOnSnapshotRestore.WALDir,
350		PeerURLs:            m.EtcdOnSnapshotRestore.AdvertisePeerURLs,
351		InitialCluster:      m.EtcdOnSnapshotRestore.InitialCluster,
352		InitialClusterToken: m.EtcdOnSnapshotRestore.InitialClusterToken,
353		SkipHashCheck:       false,
354		// TODO: set SkipHashCheck it true, to recover from existing db file
355	})
356	took := time.Since(now)
357	lg.Info(
358		"snapshot restore END",
359		zap.String("member-name", m.SnapshotInfo.MemberName),
360		zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
361		zap.String("snapshot-path", m.SnapshotPath),
362		zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
363		zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize),
364		zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey),
365		zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash),
366		zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision),
367		zap.String("took", took.String()),
368		zap.Error(err),
369	)
370	return err
371}
372