1// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18	"bytes"
19	"context"
20	"encoding/json"
21	"fmt"
22	"io/ioutil"
23	"net/http"
24	"strings"
25	"time"
26
27	"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
28	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
29	"go.etcd.io/etcd/mvcc"
30	"go.etcd.io/etcd/pkg/traceutil"
31	"go.etcd.io/etcd/pkg/types"
32
33	"go.uber.org/zap"
34)
35
36// CheckInitialHashKV compares initial hash values with its peers
37// before serving any peer/client traffic. Only mismatch when hashes
38// are different at requested revision, with same compact revision.
39func (s *EtcdServer) CheckInitialHashKV() error {
40	if !s.Cfg.InitialCorruptCheck {
41		return nil
42	}
43
44	lg := s.getLogger()
45
46	if lg != nil {
47		lg.Info(
48			"starting initial corruption check",
49			zap.String("local-member-id", s.ID().String()),
50			zap.Duration("timeout", s.Cfg.ReqTimeout()),
51		)
52	} else {
53		plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
54	}
55
56	h, rev, crev, err := s.kv.HashByRev(0)
57	if err != nil {
58		return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err)
59	}
60	peers := s.getPeerHashKVs(rev)
61	mismatch := 0
62	for _, p := range peers {
63		if p.resp != nil {
64			peerID := types.ID(p.resp.Header.MemberId)
65			fields := []zap.Field{
66				zap.String("local-member-id", s.ID().String()),
67				zap.Int64("local-member-revision", rev),
68				zap.Int64("local-member-compact-revision", crev),
69				zap.Uint32("local-member-hash", h),
70				zap.String("remote-peer-id", peerID.String()),
71				zap.Strings("remote-peer-endpoints", p.eps),
72				zap.Int64("remote-peer-revision", p.resp.Header.Revision),
73				zap.Int64("remote-peer-compact-revision", p.resp.CompactRevision),
74				zap.Uint32("remote-peer-hash", p.resp.Hash),
75			}
76
77			if h != p.resp.Hash {
78				if crev == p.resp.CompactRevision {
79					if lg != nil {
80						lg.Warn("found different hash values from remote peer", fields...)
81					} else {
82						plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
83					}
84					mismatch++
85				} else {
86					if lg != nil {
87						lg.Warn("found different compact revision values from remote peer", fields...)
88					} else {
89						plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
90					}
91				}
92			}
93
94			continue
95		}
96
97		if p.err != nil {
98			switch p.err {
99			case rpctypes.ErrFutureRev:
100				if lg != nil {
101					lg.Warn(
102						"cannot fetch hash from slow remote peer",
103						zap.String("local-member-id", s.ID().String()),
104						zap.Int64("local-member-revision", rev),
105						zap.Int64("local-member-compact-revision", crev),
106						zap.Uint32("local-member-hash", h),
107						zap.String("remote-peer-id", p.id.String()),
108						zap.Strings("remote-peer-endpoints", p.eps),
109						zap.Error(err),
110					)
111				} else {
112					plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
113				}
114			case rpctypes.ErrCompacted:
115				if lg != nil {
116					lg.Warn(
117						"cannot fetch hash from remote peer; local member is behind",
118						zap.String("local-member-id", s.ID().String()),
119						zap.Int64("local-member-revision", rev),
120						zap.Int64("local-member-compact-revision", crev),
121						zap.Uint32("local-member-hash", h),
122						zap.String("remote-peer-id", p.id.String()),
123						zap.Strings("remote-peer-endpoints", p.eps),
124						zap.Error(err),
125					)
126				} else {
127					plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
128				}
129			}
130		}
131	}
132	if mismatch > 0 {
133		return fmt.Errorf("%s found data inconsistency with peers", s.ID())
134	}
135
136	if lg != nil {
137		lg.Info(
138			"initial corruption checking passed; no corruption",
139			zap.String("local-member-id", s.ID().String()),
140		)
141	} else {
142		plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
143	}
144	return nil
145}
146
147func (s *EtcdServer) monitorKVHash() {
148	t := s.Cfg.CorruptCheckTime
149	if t == 0 {
150		return
151	}
152
153	lg := s.getLogger()
154	if lg != nil {
155		lg.Info(
156			"enabled corruption checking",
157			zap.String("local-member-id", s.ID().String()),
158			zap.Duration("interval", t),
159		)
160	} else {
161		plog.Infof("enabled corruption checking with %s interval", t)
162	}
163
164	for {
165		select {
166		case <-s.stopping:
167			return
168		case <-time.After(t):
169		}
170		if !s.isLeader() {
171			continue
172		}
173		if err := s.checkHashKV(); err != nil {
174			if lg != nil {
175				lg.Warn("failed to check hash KV", zap.Error(err))
176			} else {
177				plog.Debugf("check hash kv failed %v", err)
178			}
179		}
180	}
181}
182
183func (s *EtcdServer) checkHashKV() error {
184	lg := s.getLogger()
185
186	h, rev, crev, err := s.kv.HashByRev(0)
187	if err != nil {
188		return err
189	}
190	peers := s.getPeerHashKVs(rev)
191
192	ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
193	err = s.linearizableReadNotify(ctx)
194	cancel()
195	if err != nil {
196		return err
197	}
198
199	h2, rev2, crev2, err := s.kv.HashByRev(0)
200	if err != nil {
201		return err
202	}
203
204	alarmed := false
205	mismatch := func(id uint64) {
206		if alarmed {
207			return
208		}
209		alarmed = true
210		a := &pb.AlarmRequest{
211			MemberID: id,
212			Action:   pb.AlarmRequest_ACTIVATE,
213			Alarm:    pb.AlarmType_CORRUPT,
214		}
215		s.goAttach(func() {
216			s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
217		})
218	}
219
220	if h2 != h && rev2 == rev && crev == crev2 {
221		if lg != nil {
222			lg.Warn(
223				"found hash mismatch",
224				zap.Int64("revision-1", rev),
225				zap.Int64("compact-revision-1", crev),
226				zap.Uint32("hash-1", h),
227				zap.Int64("revision-2", rev2),
228				zap.Int64("compact-revision-2", crev2),
229				zap.Uint32("hash-2", h2),
230			)
231		} else {
232			plog.Warningf("mismatched hashes %d and %d for revision %d", h, h2, rev)
233		}
234		mismatch(uint64(s.ID()))
235	}
236
237	checkedCount := 0
238	for _, p := range peers {
239		if p.resp == nil {
240			continue
241		}
242		checkedCount++
243		id := p.resp.Header.MemberId
244
245		// leader expects follower's latest revision less than or equal to leader's
246		if p.resp.Header.Revision > rev2 {
247			if lg != nil {
248				lg.Warn(
249					"revision from follower must be less than or equal to leader's",
250					zap.Int64("leader-revision", rev2),
251					zap.Int64("follower-revision", p.resp.Header.Revision),
252					zap.String("follower-peer-id", types.ID(id).String()),
253				)
254			} else {
255				plog.Warningf(
256					"revision %d from member %v, expected at most %d",
257					p.resp.Header.Revision,
258					types.ID(id),
259					rev2)
260			}
261			mismatch(id)
262		}
263
264		// leader expects follower's latest compact revision less than or equal to leader's
265		if p.resp.CompactRevision > crev2 {
266			if lg != nil {
267				lg.Warn(
268					"compact revision from follower must be less than or equal to leader's",
269					zap.Int64("leader-compact-revision", crev2),
270					zap.Int64("follower-compact-revision", p.resp.CompactRevision),
271					zap.String("follower-peer-id", types.ID(id).String()),
272				)
273			} else {
274				plog.Warningf(
275					"compact revision %d from member %v, expected at most %d",
276					p.resp.CompactRevision,
277					types.ID(id),
278					crev2,
279				)
280			}
281			mismatch(id)
282		}
283
284		// follower's compact revision is leader's old one, then hashes must match
285		if p.resp.CompactRevision == crev && p.resp.Hash != h {
286			if lg != nil {
287				lg.Warn(
288					"same compact revision then hashes must match",
289					zap.Int64("leader-compact-revision", crev2),
290					zap.Uint32("leader-hash", h),
291					zap.Int64("follower-compact-revision", p.resp.CompactRevision),
292					zap.Uint32("follower-hash", p.resp.Hash),
293					zap.String("follower-peer-id", types.ID(id).String()),
294				)
295			} else {
296				plog.Warningf(
297					"hash %d at revision %d from member %v, expected hash %d",
298					p.resp.Hash,
299					rev,
300					types.ID(id),
301					h,
302				)
303			}
304			mismatch(id)
305		}
306	}
307	if lg != nil {
308		lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount))
309	} else {
310		plog.Infof("finished peer corruption check")
311	}
312
313	return nil
314}
315
316type peerInfo struct {
317	id  types.ID
318	eps []string
319}
320
321type peerHashKVResp struct {
322	peerInfo
323	resp *pb.HashKVResponse
324	err  error
325}
326
327func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp {
328	// TODO: handle the case when "s.cluster.Members" have not
329	// been populated (e.g. no snapshot to load from disk)
330	members := s.cluster.Members()
331	peers := make([]peerInfo, 0, len(members))
332	for _, m := range members {
333		if m.ID == s.ID() {
334			continue
335		}
336		peers = append(peers, peerInfo{id: m.ID, eps: m.PeerURLs})
337	}
338
339	lg := s.getLogger()
340
341	var resps []*peerHashKVResp
342	for _, p := range peers {
343		if len(p.eps) == 0 {
344			continue
345		}
346
347		respsLen := len(resps)
348		var lastErr error
349		for _, ep := range p.eps {
350			ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
351
352			var resp *pb.HashKVResponse
353			resp, lastErr = s.getPeerHashKVHTTP(ctx, ep, rev)
354			cancel()
355			if lastErr == nil {
356				resps = append(resps, &peerHashKVResp{peerInfo: p, resp: resp, err: nil})
357				break
358			}
359			if lg != nil {
360				lg.Warn(
361					"failed hash kv request",
362					zap.String("local-member-id", s.ID().String()),
363					zap.Int64("requested-revision", rev),
364					zap.String("remote-peer-endpoint", ep),
365					zap.Error(lastErr),
366				)
367			} else {
368				plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), lastErr.Error(), ep, rev)
369			}
370		}
371
372		// failed to get hashKV from all endpoints of this peer
373		if respsLen == len(resps) {
374			resps = append(resps, &peerHashKVResp{peerInfo: p, resp: nil, err: lastErr})
375		}
376	}
377	return resps
378}
379
380type applierV3Corrupt struct {
381	applierV3
382}
383
384func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }
385
386func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
387	return nil, nil, ErrCorrupt
388}
389
390func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
391	return nil, ErrCorrupt
392}
393
394func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
395	return nil, ErrCorrupt
396}
397
398func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
399	return nil, ErrCorrupt
400}
401
402func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
403	return nil, nil, nil, ErrCorrupt
404}
405
406func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
407	return nil, ErrCorrupt
408}
409
410func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
411	return nil, ErrCorrupt
412}
413
414type ServerPeerV2 interface {
415	ServerPeer
416	HashKVHandler() http.Handler
417}
418
419const PeerHashKVPath = "/members/hashkv"
420
421type hashKVHandler struct {
422	lg     *zap.Logger
423	server *EtcdServer
424}
425
426func (s *EtcdServer) HashKVHandler() http.Handler {
427	return &hashKVHandler{lg: s.getLogger(), server: s}
428}
429
430func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
431	if r.Method != http.MethodGet {
432		w.Header().Set("Allow", http.MethodGet)
433		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
434		return
435	}
436	if r.URL.Path != PeerHashKVPath {
437		http.Error(w, "bad path", http.StatusBadRequest)
438		return
439	}
440
441	defer r.Body.Close()
442	b, err := ioutil.ReadAll(r.Body)
443	if err != nil {
444		http.Error(w, "error reading body", http.StatusBadRequest)
445		return
446	}
447
448	req := &pb.HashKVRequest{}
449	if err = json.Unmarshal(b, req); err != nil {
450		h.lg.Warn("failed to unmarshal request", zap.Error(err))
451		http.Error(w, "error unmarshalling request", http.StatusBadRequest)
452		return
453	}
454	hash, rev, compactRev, err := h.server.KV().HashByRev(req.Revision)
455	if err != nil {
456		h.lg.Warn(
457			"failed to get hashKV",
458			zap.Int64("requested-revision", req.Revision),
459			zap.Error(err),
460		)
461		http.Error(w, err.Error(), http.StatusBadRequest)
462		return
463	}
464	resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash, CompactRevision: compactRev}
465	respBytes, err := json.Marshal(resp)
466	if err != nil {
467		h.lg.Warn("failed to marshal hashKV response", zap.Error(err))
468		http.Error(w, err.Error(), http.StatusInternalServerError)
469		return
470	}
471
472	w.Header().Set("X-Etcd-Cluster-ID", h.server.Cluster().ID().String())
473	w.Header().Set("Content-Type", "application/json")
474	w.Write(respBytes)
475}
476
477// getPeerHashKVHTTP fetch hash of kv store at the given rev via http call to the given url
478func (s *EtcdServer) getPeerHashKVHTTP(ctx context.Context, url string, rev int64) (*pb.HashKVResponse, error) {
479	cc := &http.Client{Transport: s.peerRt}
480	hashReq := &pb.HashKVRequest{Revision: rev}
481	hashReqBytes, err := json.Marshal(hashReq)
482	if err != nil {
483		return nil, err
484	}
485	requestUrl := url + PeerHashKVPath
486	req, err := http.NewRequest(http.MethodGet, requestUrl, bytes.NewReader(hashReqBytes))
487	if err != nil {
488		return nil, err
489	}
490	req = req.WithContext(ctx)
491	req.Header.Set("Content-Type", "application/json")
492	req.Cancel = ctx.Done()
493
494	resp, err := cc.Do(req)
495	if err != nil {
496		return nil, err
497	}
498	defer resp.Body.Close()
499	b, err := ioutil.ReadAll(resp.Body)
500	if err != nil {
501		return nil, err
502	}
503
504	if resp.StatusCode == http.StatusBadRequest {
505		if strings.Contains(string(b), mvcc.ErrCompacted.Error()) {
506			return nil, rpctypes.ErrCompacted
507		}
508		if strings.Contains(string(b), mvcc.ErrFutureRev.Error()) {
509			return nil, rpctypes.ErrFutureRev
510		}
511	}
512	if resp.StatusCode != http.StatusOK {
513		return nil, fmt.Errorf("unknown error: %s", string(b))
514	}
515
516	hashResp := &pb.HashKVResponse{}
517	if err := json.Unmarshal(b, hashResp); err != nil {
518		return nil, err
519	}
520	return hashResp, nil
521}
522