1// Copyright 2018 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rpcpb 16 17import ( 18 "context" 19 "crypto/tls" 20 "fmt" 21 "net/url" 22 "os" 23 "time" 24 25 "go.etcd.io/etcd/clientv3" 26 "go.etcd.io/etcd/clientv3/snapshot" 27 pb "go.etcd.io/etcd/etcdserver/etcdserverpb" 28 "go.etcd.io/etcd/pkg/logutil" 29 "go.etcd.io/etcd/pkg/transport" 30 31 "github.com/dustin/go-humanize" 32 "go.uber.org/zap" 33 grpc "google.golang.org/grpc" 34 "google.golang.org/grpc/credentials" 35) 36 37// ElectionTimeout returns an election timeout duration. 38func (m *Member) ElectionTimeout() time.Duration { 39 return time.Duration(m.Etcd.ElectionTimeoutMs) * time.Millisecond 40} 41 42// DialEtcdGRPCServer creates a raw gRPC connection to an etcd member. 43func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn, error) { 44 dialOpts := []grpc.DialOption{ 45 grpc.WithTimeout(5 * time.Second), 46 grpc.WithBlock(), 47 } 48 49 secure := false 50 for _, cu := range m.Etcd.AdvertiseClientURLs { 51 u, err := url.Parse(cu) 52 if err != nil { 53 return nil, err 54 } 55 if u.Scheme == "https" { // TODO: handle unix 56 secure = true 57 } 58 } 59 60 if secure { 61 // assume save TLS assets are already stord on disk 62 tlsInfo := transport.TLSInfo{ 63 CertFile: m.ClientCertPath, 64 KeyFile: m.ClientKeyPath, 65 TrustedCAFile: m.ClientTrustedCAPath, 66 67 // TODO: remove this with generated certs 68 // only need it for auto TLS 69 InsecureSkipVerify: true, 70 } 71 tlsConfig, err := tlsInfo.ClientConfig() 72 if err != nil { 73 return nil, err 74 } 75 creds := credentials.NewTLS(tlsConfig) 76 dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds)) 77 } else { 78 dialOpts = append(dialOpts, grpc.WithInsecure()) 79 } 80 dialOpts = append(dialOpts, opts...) 81 return grpc.Dial(m.EtcdClientEndpoint, dialOpts...) 82} 83 84// CreateEtcdClientConfig creates a client configuration from member. 85func (m *Member) CreateEtcdClientConfig(opts ...grpc.DialOption) (cfg *clientv3.Config, err error) { 86 secure := false 87 for _, cu := range m.Etcd.AdvertiseClientURLs { 88 var u *url.URL 89 u, err = url.Parse(cu) 90 if err != nil { 91 return nil, err 92 } 93 if u.Scheme == "https" { // TODO: handle unix 94 secure = true 95 } 96 } 97 98 // TODO: make this configurable 99 level := "error" 100 if os.Getenv("ETCD_CLIENT_DEBUG") != "" { 101 level = "debug" 102 } 103 lcfg := logutil.DefaultZapLoggerConfig 104 lcfg.Level = zap.NewAtomicLevelAt(logutil.ConvertToZapLevel(level)) 105 106 cfg = &clientv3.Config{ 107 Endpoints: []string{m.EtcdClientEndpoint}, 108 DialTimeout: 10 * time.Second, 109 DialOptions: opts, 110 LogConfig: &lcfg, 111 } 112 if secure { 113 // assume save TLS assets are already stord on disk 114 tlsInfo := transport.TLSInfo{ 115 CertFile: m.ClientCertPath, 116 KeyFile: m.ClientKeyPath, 117 TrustedCAFile: m.ClientTrustedCAPath, 118 119 // TODO: remove this with generated certs 120 // only need it for auto TLS 121 InsecureSkipVerify: true, 122 } 123 var tlsConfig *tls.Config 124 tlsConfig, err = tlsInfo.ClientConfig() 125 if err != nil { 126 return nil, err 127 } 128 cfg.TLS = tlsConfig 129 } 130 return cfg, err 131} 132 133// CreateEtcdClient creates a client from member. 134func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, error) { 135 cfg, err := m.CreateEtcdClientConfig(opts...) 136 if err != nil { 137 return nil, err 138 } 139 return clientv3.New(*cfg) 140} 141 142// CheckCompact ensures that historical data before given revision has been compacted. 143func (m *Member) CheckCompact(rev int64) error { 144 cli, err := m.CreateEtcdClient() 145 if err != nil { 146 return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint) 147 } 148 defer cli.Close() 149 150 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 151 wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1)) 152 wr, ok := <-wch 153 cancel() 154 155 if !ok { 156 return fmt.Errorf("watch channel terminated (endpoint %q)", m.EtcdClientEndpoint) 157 } 158 if wr.CompactRevision != rev { 159 return fmt.Errorf("got compact revision %v, wanted %v (endpoint %q)", wr.CompactRevision, rev, m.EtcdClientEndpoint) 160 } 161 162 return nil 163} 164 165// Defrag runs defragmentation on this member. 166func (m *Member) Defrag() error { 167 cli, err := m.CreateEtcdClient() 168 if err != nil { 169 return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint) 170 } 171 defer cli.Close() 172 173 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 174 _, err = cli.Defragment(ctx, m.EtcdClientEndpoint) 175 cancel() 176 return err 177} 178 179// RevHash fetches current revision and hash on this member. 180func (m *Member) RevHash() (int64, int64, error) { 181 conn, err := m.DialEtcdGRPCServer() 182 if err != nil { 183 return 0, 0, err 184 } 185 defer conn.Close() 186 187 mt := pb.NewMaintenanceClient(conn) 188 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 189 resp, err := mt.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false)) 190 cancel() 191 192 if err != nil { 193 return 0, 0, err 194 } 195 196 return resp.Header.Revision, int64(resp.Hash), nil 197} 198 199// Rev fetches current revision on this member. 200func (m *Member) Rev(ctx context.Context) (int64, error) { 201 cli, err := m.CreateEtcdClient() 202 if err != nil { 203 return 0, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint) 204 } 205 defer cli.Close() 206 207 resp, err := cli.Status(ctx, m.EtcdClientEndpoint) 208 if err != nil { 209 return 0, err 210 } 211 return resp.Header.Revision, nil 212} 213 214// Compact compacts member storage with given revision. 215// It blocks until it's physically done. 216func (m *Member) Compact(rev int64, timeout time.Duration) error { 217 cli, err := m.CreateEtcdClient() 218 if err != nil { 219 return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint) 220 } 221 defer cli.Close() 222 223 ctx, cancel := context.WithTimeout(context.Background(), timeout) 224 _, err = cli.Compact(ctx, rev, clientv3.WithCompactPhysical()) 225 cancel() 226 return err 227} 228 229// IsLeader returns true if this member is the current cluster leader. 230func (m *Member) IsLeader() (bool, error) { 231 cli, err := m.CreateEtcdClient() 232 if err != nil { 233 return false, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint) 234 } 235 defer cli.Close() 236 237 resp, err := cli.Status(context.Background(), m.EtcdClientEndpoint) 238 if err != nil { 239 return false, err 240 } 241 return resp.Header.MemberId == resp.Leader, nil 242} 243 244// WriteHealthKey writes a health key to this member. 245func (m *Member) WriteHealthKey() error { 246 cli, err := m.CreateEtcdClient() 247 if err != nil { 248 return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint) 249 } 250 defer cli.Close() 251 252 // give enough time-out in case expensive requests (range/delete) are pending 253 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 254 _, err = cli.Put(ctx, "health", "good") 255 cancel() 256 if err != nil { 257 return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint) 258 } 259 return nil 260} 261 262// SaveSnapshot downloads a snapshot file from this member, locally. 263// It's meant to requested remotely, so that local member can store 264// snapshot file on local disk. 265func (m *Member) SaveSnapshot(lg *zap.Logger) (err error) { 266 // remove existing snapshot first 267 if err = os.RemoveAll(m.SnapshotPath); err != nil { 268 return err 269 } 270 271 var ccfg *clientv3.Config 272 ccfg, err = m.CreateEtcdClientConfig() 273 if err != nil { 274 return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint) 275 } 276 277 lg.Info( 278 "snapshot save START", 279 zap.String("member-name", m.Etcd.Name), 280 zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs), 281 zap.String("snapshot-path", m.SnapshotPath), 282 ) 283 now := time.Now() 284 mgr := snapshot.NewV3(lg) 285 if err = mgr.Save(context.Background(), *ccfg, m.SnapshotPath); err != nil { 286 return err 287 } 288 took := time.Since(now) 289 290 var fi os.FileInfo 291 fi, err = os.Stat(m.SnapshotPath) 292 if err != nil { 293 return err 294 } 295 var st snapshot.Status 296 st, err = mgr.Status(m.SnapshotPath) 297 if err != nil { 298 return err 299 } 300 m.SnapshotInfo = &SnapshotInfo{ 301 MemberName: m.Etcd.Name, 302 MemberClientURLs: m.Etcd.AdvertiseClientURLs, 303 SnapshotPath: m.SnapshotPath, 304 SnapshotFileSize: humanize.Bytes(uint64(fi.Size())), 305 SnapshotTotalSize: humanize.Bytes(uint64(st.TotalSize)), 306 SnapshotTotalKey: int64(st.TotalKey), 307 SnapshotHash: int64(st.Hash), 308 SnapshotRevision: st.Revision, 309 Took: fmt.Sprintf("%v", took), 310 } 311 lg.Info( 312 "snapshot save END", 313 zap.String("member-name", m.SnapshotInfo.MemberName), 314 zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs), 315 zap.String("snapshot-path", m.SnapshotPath), 316 zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize), 317 zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize), 318 zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey), 319 zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash), 320 zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision), 321 zap.String("took", m.SnapshotInfo.Took), 322 ) 323 return nil 324} 325 326// RestoreSnapshot restores a cluster from a given snapshot file on disk. 327// It's meant to requested remotely, so that local member can load the 328// snapshot file from local disk. 329func (m *Member) RestoreSnapshot(lg *zap.Logger) (err error) { 330 if err = os.RemoveAll(m.EtcdOnSnapshotRestore.DataDir); err != nil { 331 return err 332 } 333 if err = os.RemoveAll(m.EtcdOnSnapshotRestore.WALDir); err != nil { 334 return err 335 } 336 337 lg.Info( 338 "snapshot restore START", 339 zap.String("member-name", m.Etcd.Name), 340 zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs), 341 zap.String("snapshot-path", m.SnapshotPath), 342 ) 343 now := time.Now() 344 mgr := snapshot.NewV3(lg) 345 err = mgr.Restore(snapshot.RestoreConfig{ 346 SnapshotPath: m.SnapshotInfo.SnapshotPath, 347 Name: m.EtcdOnSnapshotRestore.Name, 348 OutputDataDir: m.EtcdOnSnapshotRestore.DataDir, 349 OutputWALDir: m.EtcdOnSnapshotRestore.WALDir, 350 PeerURLs: m.EtcdOnSnapshotRestore.AdvertisePeerURLs, 351 InitialCluster: m.EtcdOnSnapshotRestore.InitialCluster, 352 InitialClusterToken: m.EtcdOnSnapshotRestore.InitialClusterToken, 353 SkipHashCheck: false, 354 // TODO: set SkipHashCheck it true, to recover from existing db file 355 }) 356 took := time.Since(now) 357 lg.Info( 358 "snapshot restore END", 359 zap.String("member-name", m.SnapshotInfo.MemberName), 360 zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs), 361 zap.String("snapshot-path", m.SnapshotPath), 362 zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize), 363 zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize), 364 zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey), 365 zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash), 366 zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision), 367 zap.String("took", took.String()), 368 zap.Error(err), 369 ) 370 return err 371} 372