1// Copyright 2017 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdserver 16 17import ( 18 "bytes" 19 "context" 20 "encoding/json" 21 "fmt" 22 "io/ioutil" 23 "net/http" 24 "strings" 25 "time" 26 27 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" 28 pb "go.etcd.io/etcd/etcdserver/etcdserverpb" 29 "go.etcd.io/etcd/mvcc" 30 "go.etcd.io/etcd/pkg/traceutil" 31 "go.etcd.io/etcd/pkg/types" 32 33 "go.uber.org/zap" 34) 35 36// CheckInitialHashKV compares initial hash values with its peers 37// before serving any peer/client traffic. Only mismatch when hashes 38// are different at requested revision, with same compact revision. 39func (s *EtcdServer) CheckInitialHashKV() error { 40 if !s.Cfg.InitialCorruptCheck { 41 return nil 42 } 43 44 lg := s.getLogger() 45 46 if lg != nil { 47 lg.Info( 48 "starting initial corruption check", 49 zap.String("local-member-id", s.ID().String()), 50 zap.Duration("timeout", s.Cfg.ReqTimeout()), 51 ) 52 } else { 53 plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout()) 54 } 55 56 h, rev, crev, err := s.kv.HashByRev(0) 57 if err != nil { 58 return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err) 59 } 60 peers := s.getPeerHashKVs(rev) 61 mismatch := 0 62 for _, p := range peers { 63 if p.resp != nil { 64 peerID := types.ID(p.resp.Header.MemberId) 65 fields := []zap.Field{ 66 zap.String("local-member-id", s.ID().String()), 67 zap.Int64("local-member-revision", rev), 68 zap.Int64("local-member-compact-revision", crev), 69 zap.Uint32("local-member-hash", h), 70 zap.String("remote-peer-id", peerID.String()), 71 zap.Strings("remote-peer-endpoints", p.eps), 72 zap.Int64("remote-peer-revision", p.resp.Header.Revision), 73 zap.Int64("remote-peer-compact-revision", p.resp.CompactRevision), 74 zap.Uint32("remote-peer-hash", p.resp.Hash), 75 } 76 77 if h != p.resp.Hash { 78 if crev == p.resp.CompactRevision { 79 if lg != nil { 80 lg.Warn("found different hash values from remote peer", fields...) 81 } else { 82 plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev) 83 } 84 mismatch++ 85 } else { 86 if lg != nil { 87 lg.Warn("found different compact revision values from remote peer", fields...) 88 } else { 89 plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev) 90 } 91 } 92 } 93 94 continue 95 } 96 97 if p.err != nil { 98 switch p.err { 99 case rpctypes.ErrFutureRev: 100 if lg != nil { 101 lg.Warn( 102 "cannot fetch hash from slow remote peer", 103 zap.String("local-member-id", s.ID().String()), 104 zap.Int64("local-member-revision", rev), 105 zap.Int64("local-member-compact-revision", crev), 106 zap.Uint32("local-member-hash", h), 107 zap.String("remote-peer-id", p.id.String()), 108 zap.Strings("remote-peer-endpoints", p.eps), 109 zap.Error(err), 110 ) 111 } else { 112 plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) 113 } 114 case rpctypes.ErrCompacted: 115 if lg != nil { 116 lg.Warn( 117 "cannot fetch hash from remote peer; local member is behind", 118 zap.String("local-member-id", s.ID().String()), 119 zap.Int64("local-member-revision", rev), 120 zap.Int64("local-member-compact-revision", crev), 121 zap.Uint32("local-member-hash", h), 122 zap.String("remote-peer-id", p.id.String()), 123 zap.Strings("remote-peer-endpoints", p.eps), 124 zap.Error(err), 125 ) 126 } else { 127 plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error()) 128 } 129 } 130 } 131 } 132 if mismatch > 0 { 133 return fmt.Errorf("%s found data inconsistency with peers", s.ID()) 134 } 135 136 if lg != nil { 137 lg.Info( 138 "initial corruption checking passed; no corruption", 139 zap.String("local-member-id", s.ID().String()), 140 ) 141 } else { 142 plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID()) 143 } 144 return nil 145} 146 147func (s *EtcdServer) monitorKVHash() { 148 t := s.Cfg.CorruptCheckTime 149 if t == 0 { 150 return 151 } 152 153 lg := s.getLogger() 154 if lg != nil { 155 lg.Info( 156 "enabled corruption checking", 157 zap.String("local-member-id", s.ID().String()), 158 zap.Duration("interval", t), 159 ) 160 } else { 161 plog.Infof("enabled corruption checking with %s interval", t) 162 } 163 164 for { 165 select { 166 case <-s.stopping: 167 return 168 case <-time.After(t): 169 } 170 if !s.isLeader() { 171 continue 172 } 173 if err := s.checkHashKV(); err != nil { 174 if lg != nil { 175 lg.Warn("failed to check hash KV", zap.Error(err)) 176 } else { 177 plog.Debugf("check hash kv failed %v", err) 178 } 179 } 180 } 181} 182 183func (s *EtcdServer) checkHashKV() error { 184 lg := s.getLogger() 185 186 h, rev, crev, err := s.kv.HashByRev(0) 187 if err != nil { 188 return err 189 } 190 peers := s.getPeerHashKVs(rev) 191 192 ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) 193 err = s.linearizableReadNotify(ctx) 194 cancel() 195 if err != nil { 196 return err 197 } 198 199 h2, rev2, crev2, err := s.kv.HashByRev(0) 200 if err != nil { 201 return err 202 } 203 204 alarmed := false 205 mismatch := func(id uint64) { 206 if alarmed { 207 return 208 } 209 alarmed = true 210 a := &pb.AlarmRequest{ 211 MemberID: id, 212 Action: pb.AlarmRequest_ACTIVATE, 213 Alarm: pb.AlarmType_CORRUPT, 214 } 215 s.goAttach(func() { 216 s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) 217 }) 218 } 219 220 if h2 != h && rev2 == rev && crev == crev2 { 221 if lg != nil { 222 lg.Warn( 223 "found hash mismatch", 224 zap.Int64("revision-1", rev), 225 zap.Int64("compact-revision-1", crev), 226 zap.Uint32("hash-1", h), 227 zap.Int64("revision-2", rev2), 228 zap.Int64("compact-revision-2", crev2), 229 zap.Uint32("hash-2", h2), 230 ) 231 } else { 232 plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev) 233 } 234 mismatch(uint64(s.ID())) 235 } 236 237 checkedCount := 0 238 for _, p := range peers { 239 if p.resp == nil { 240 continue 241 } 242 checkedCount++ 243 id := p.resp.Header.MemberId 244 245 // leader expects follower's latest revision less than or equal to leader's 246 if p.resp.Header.Revision > rev2 { 247 if lg != nil { 248 lg.Warn( 249 "revision from follower must be less than or equal to leader's", 250 zap.Int64("leader-revision", rev2), 251 zap.Int64("follower-revision", p.resp.Header.Revision), 252 zap.String("follower-peer-id", types.ID(id).String()), 253 ) 254 } else { 255 plog.Warningf( 256 "revision %d from member %v, expected at most %d", 257 p.resp.Header.Revision, 258 types.ID(id), 259 rev2) 260 } 261 mismatch(id) 262 } 263 264 // leader expects follower's latest compact revision less than or equal to leader's 265 if p.resp.CompactRevision > crev2 { 266 if lg != nil { 267 lg.Warn( 268 "compact revision from follower must be less than or equal to leader's", 269 zap.Int64("leader-compact-revision", crev2), 270 zap.Int64("follower-compact-revision", p.resp.CompactRevision), 271 zap.String("follower-peer-id", types.ID(id).String()), 272 ) 273 } else { 274 plog.Warningf( 275 "compact revision %d from member %v, expected at most %d", 276 p.resp.CompactRevision, 277 types.ID(id), 278 crev2, 279 ) 280 } 281 mismatch(id) 282 } 283 284 // follower's compact revision is leader's old one, then hashes must match 285 if p.resp.CompactRevision == crev && p.resp.Hash != h { 286 if lg != nil { 287 lg.Warn( 288 "same compact revision then hashes must match", 289 zap.Int64("leader-compact-revision", crev2), 290 zap.Uint32("leader-hash", h), 291 zap.Int64("follower-compact-revision", p.resp.CompactRevision), 292 zap.Uint32("follower-hash", p.resp.Hash), 293 zap.String("follower-peer-id", types.ID(id).String()), 294 ) 295 } else { 296 plog.Warningf( 297 "hash %d at revision %d from member %v, expected hash %d", 298 p.resp.Hash, 299 rev, 300 types.ID(id), 301 h, 302 ) 303 } 304 mismatch(id) 305 } 306 } 307 if lg != nil { 308 lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount)) 309 } else { 310 plog.Infof("finished peer corruption check") 311 } 312 313 return nil 314} 315 316type peerInfo struct { 317 id types.ID 318 eps []string 319} 320 321type peerHashKVResp struct { 322 peerInfo 323 resp *pb.HashKVResponse 324 err error 325} 326 327func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp { 328 // TODO: handle the case when "s.cluster.Members" have not 329 // been populated (e.g. no snapshot to load from disk) 330 members := s.cluster.Members() 331 peers := make([]peerInfo, 0, len(members)) 332 for _, m := range members { 333 if m.ID == s.ID() { 334 continue 335 } 336 peers = append(peers, peerInfo{id: m.ID, eps: m.PeerURLs}) 337 } 338 339 lg := s.getLogger() 340 341 var resps []*peerHashKVResp 342 for _, p := range peers { 343 if len(p.eps) == 0 { 344 continue 345 } 346 347 respsLen := len(resps) 348 var lastErr error 349 for _, ep := range p.eps { 350 ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) 351 352 var resp *pb.HashKVResponse 353 resp, lastErr = s.getPeerHashKVHTTP(ctx, ep, rev) 354 cancel() 355 if lastErr == nil { 356 resps = append(resps, &peerHashKVResp{peerInfo: p, resp: resp, err: nil}) 357 break 358 } 359 if lg != nil { 360 lg.Warn( 361 "failed hash kv request", 362 zap.String("local-member-id", s.ID().String()), 363 zap.Int64("requested-revision", rev), 364 zap.String("remote-peer-endpoint", ep), 365 zap.Error(lastErr), 366 ) 367 } else { 368 plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), lastErr.Error(), ep, rev) 369 } 370 } 371 372 // failed to get hashKV from all endpoints of this peer 373 if respsLen == len(resps) { 374 resps = append(resps, &peerHashKVResp{peerInfo: p, resp: nil, err: lastErr}) 375 } 376 } 377 return resps 378} 379 380type applierV3Corrupt struct { 381 applierV3 382} 383 384func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} } 385 386func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { 387 return nil, nil, ErrCorrupt 388} 389 390func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { 391 return nil, ErrCorrupt 392} 393 394func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { 395 return nil, ErrCorrupt 396} 397 398func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { 399 return nil, ErrCorrupt 400} 401 402func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { 403 return nil, nil, nil, ErrCorrupt 404} 405 406func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { 407 return nil, ErrCorrupt 408} 409 410func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { 411 return nil, ErrCorrupt 412} 413 414type ServerPeerV2 interface { 415 ServerPeer 416 HashKVHandler() http.Handler 417} 418 419const PeerHashKVPath = "/members/hashkv" 420 421type hashKVHandler struct { 422 lg *zap.Logger 423 server *EtcdServer 424} 425 426func (s *EtcdServer) HashKVHandler() http.Handler { 427 return &hashKVHandler{lg: s.getLogger(), server: s} 428} 429 430func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 431 if r.Method != http.MethodGet { 432 w.Header().Set("Allow", http.MethodGet) 433 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) 434 return 435 } 436 if r.URL.Path != PeerHashKVPath { 437 http.Error(w, "bad path", http.StatusBadRequest) 438 return 439 } 440 441 defer r.Body.Close() 442 b, err := ioutil.ReadAll(r.Body) 443 if err != nil { 444 http.Error(w, "error reading body", http.StatusBadRequest) 445 return 446 } 447 448 req := &pb.HashKVRequest{} 449 if err = json.Unmarshal(b, req); err != nil { 450 h.lg.Warn("failed to unmarshal request", zap.Error(err)) 451 http.Error(w, "error unmarshalling request", http.StatusBadRequest) 452 return 453 } 454 hash, rev, compactRev, err := h.server.KV().HashByRev(req.Revision) 455 if err != nil { 456 h.lg.Warn( 457 "failed to get hashKV", 458 zap.Int64("requested-revision", req.Revision), 459 zap.Error(err), 460 ) 461 http.Error(w, err.Error(), http.StatusBadRequest) 462 return 463 } 464 resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash, CompactRevision: compactRev} 465 respBytes, err := json.Marshal(resp) 466 if err != nil { 467 h.lg.Warn("failed to marshal hashKV response", zap.Error(err)) 468 http.Error(w, err.Error(), http.StatusInternalServerError) 469 return 470 } 471 472 w.Header().Set("X-Etcd-Cluster-ID", h.server.Cluster().ID().String()) 473 w.Header().Set("Content-Type", "application/json") 474 w.Write(respBytes) 475} 476 477// getPeerHashKVHTTP fetch hash of kv store at the given rev via http call to the given url 478func (s *EtcdServer) getPeerHashKVHTTP(ctx context.Context, url string, rev int64) (*pb.HashKVResponse, error) { 479 cc := &http.Client{Transport: s.peerRt} 480 hashReq := &pb.HashKVRequest{Revision: rev} 481 hashReqBytes, err := json.Marshal(hashReq) 482 if err != nil { 483 return nil, err 484 } 485 requestUrl := url + PeerHashKVPath 486 req, err := http.NewRequest(http.MethodGet, requestUrl, bytes.NewReader(hashReqBytes)) 487 if err != nil { 488 return nil, err 489 } 490 req = req.WithContext(ctx) 491 req.Header.Set("Content-Type", "application/json") 492 req.Cancel = ctx.Done() 493 494 resp, err := cc.Do(req) 495 if err != nil { 496 return nil, err 497 } 498 defer resp.Body.Close() 499 b, err := ioutil.ReadAll(resp.Body) 500 if err != nil { 501 return nil, err 502 } 503 504 if resp.StatusCode == http.StatusBadRequest { 505 if strings.Contains(string(b), mvcc.ErrCompacted.Error()) { 506 return nil, rpctypes.ErrCompacted 507 } 508 if strings.Contains(string(b), mvcc.ErrFutureRev.Error()) { 509 return nil, rpctypes.ErrFutureRev 510 } 511 } 512 if resp.StatusCode != http.StatusOK { 513 return nil, fmt.Errorf("unknown error: %s", string(b)) 514 } 515 516 hashResp := &pb.HashKVResponse{} 517 if err := json.Unmarshal(b, hashResp); err != nil { 518 return nil, err 519 } 520 return hashResp, nil 521} 522