1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "io/ioutil" 22 "net/http" 23 "path" 24 "strings" 25 "time" 26 27 "go.etcd.io/etcd/etcdserver/api/snap" 28 pioutil "go.etcd.io/etcd/pkg/ioutil" 29 "go.etcd.io/etcd/pkg/types" 30 "go.etcd.io/etcd/raft/raftpb" 31 "go.etcd.io/etcd/version" 32 33 humanize "github.com/dustin/go-humanize" 34 "go.uber.org/zap" 35) 36 37const ( 38 // connReadLimitByte limits the number of bytes 39 // a single read can read out. 40 // 41 // 64KB should be large enough for not causing 42 // throughput bottleneck as well as small enough 43 // for not causing a read timeout. 44 connReadLimitByte = 64 * 1024 45) 46 47var ( 48 RaftPrefix = "/raft" 49 ProbingPrefix = path.Join(RaftPrefix, "probing") 50 RaftStreamPrefix = path.Join(RaftPrefix, "stream") 51 RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot") 52 53 errIncompatibleVersion = errors.New("incompatible version") 54 errClusterIDMismatch = errors.New("cluster ID mismatch") 55) 56 57type peerGetter interface { 58 Get(id types.ID) Peer 59} 60 61type writerToResponse interface { 62 WriteTo(w http.ResponseWriter) 63} 64 65type pipelineHandler struct { 66 lg *zap.Logger 67 localID types.ID 68 tr Transporter 69 r Raft 70 cid types.ID 71} 72 73// newPipelineHandler returns a handler for handling raft messages 74// from pipeline for RaftPrefix. 75// 76// The handler reads out the raft message from request body, 77// and forwards it to the given raft state machine for processing. 78func newPipelineHandler(t *Transport, r Raft, cid types.ID) http.Handler { 79 return &pipelineHandler{ 80 lg: t.Logger, 81 localID: t.ID, 82 tr: t, 83 r: r, 84 cid: cid, 85 } 86} 87 88func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 89 if r.Method != "POST" { 90 w.Header().Set("Allow", "POST") 91 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) 92 return 93 } 94 95 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) 96 97 if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil { 98 http.Error(w, err.Error(), http.StatusPreconditionFailed) 99 return 100 } 101 102 addRemoteFromRequest(h.tr, r) 103 104 // Limit the data size that could be read from the request body, which ensures that read from 105 // connection will not time out accidentally due to possible blocking in underlying implementation. 106 limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte) 107 b, err := ioutil.ReadAll(limitedr) 108 if err != nil { 109 if h.lg != nil { 110 h.lg.Warn( 111 "failed to read Raft message", 112 zap.String("local-member-id", h.localID.String()), 113 zap.Error(err), 114 ) 115 } else { 116 plog.Errorf("failed to read raft message (%v)", err) 117 } 118 http.Error(w, "error reading raft message", http.StatusBadRequest) 119 recvFailures.WithLabelValues(r.RemoteAddr).Inc() 120 return 121 } 122 123 var m raftpb.Message 124 if err := m.Unmarshal(b); err != nil { 125 if h.lg != nil { 126 h.lg.Warn( 127 "failed to unmarshal Raft message", 128 zap.String("local-member-id", h.localID.String()), 129 zap.Error(err), 130 ) 131 } else { 132 plog.Errorf("failed to unmarshal raft message (%v)", err) 133 } 134 http.Error(w, "error unmarshalling raft message", http.StatusBadRequest) 135 recvFailures.WithLabelValues(r.RemoteAddr).Inc() 136 return 137 } 138 139 receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b))) 140 141 if err := h.r.Process(context.TODO(), m); err != nil { 142 switch v := err.(type) { 143 case writerToResponse: 144 v.WriteTo(w) 145 default: 146 if h.lg != nil { 147 h.lg.Warn( 148 "failed to process Raft message", 149 zap.String("local-member-id", h.localID.String()), 150 zap.Error(err), 151 ) 152 } else { 153 plog.Warningf("failed to process raft message (%v)", err) 154 } 155 http.Error(w, "error processing raft message", http.StatusInternalServerError) 156 w.(http.Flusher).Flush() 157 // disconnect the http stream 158 panic(err) 159 } 160 return 161 } 162 163 // Write StatusNoContent header after the message has been processed by 164 // raft, which facilitates the client to report MsgSnap status. 165 w.WriteHeader(http.StatusNoContent) 166} 167 168type snapshotHandler struct { 169 lg *zap.Logger 170 tr Transporter 171 r Raft 172 snapshotter *snap.Snapshotter 173 174 localID types.ID 175 cid types.ID 176} 177 178func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler { 179 return &snapshotHandler{ 180 lg: t.Logger, 181 tr: t, 182 r: r, 183 snapshotter: snapshotter, 184 localID: t.ID, 185 cid: cid, 186 } 187} 188 189const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER" 190 191// ServeHTTP serves HTTP request to receive and process snapshot message. 192// 193// If request sender dies without closing underlying TCP connection, 194// the handler will keep waiting for the request body until TCP keepalive 195// finds out that the connection is broken after several minutes. 196// This is acceptable because 197// 1. snapshot messages sent through other TCP connections could still be 198// received and processed. 199// 2. this case should happen rarely, so no further optimization is done. 200func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 201 start := time.Now() 202 203 if r.Method != "POST" { 204 w.Header().Set("Allow", "POST") 205 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) 206 snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc() 207 return 208 } 209 210 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) 211 212 if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil { 213 http.Error(w, err.Error(), http.StatusPreconditionFailed) 214 snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc() 215 return 216 } 217 218 addRemoteFromRequest(h.tr, r) 219 220 dec := &messageDecoder{r: r.Body} 221 // let snapshots be very large since they can exceed 512MB for large installations 222 m, err := dec.decodeLimit(uint64(1 << 63)) 223 from := types.ID(m.From).String() 224 if err != nil { 225 msg := fmt.Sprintf("failed to decode raft message (%v)", err) 226 if h.lg != nil { 227 h.lg.Warn( 228 "failed to decode Raft message", 229 zap.String("local-member-id", h.localID.String()), 230 zap.String("remote-snapshot-sender-id", from), 231 zap.Error(err), 232 ) 233 } else { 234 plog.Error(msg) 235 } 236 http.Error(w, msg, http.StatusBadRequest) 237 recvFailures.WithLabelValues(r.RemoteAddr).Inc() 238 snapshotReceiveFailures.WithLabelValues(from).Inc() 239 return 240 } 241 242 msgSizeVal := m.Size() 243 msgSize := humanize.Bytes(uint64(msgSizeVal)) 244 receivedBytes.WithLabelValues(from).Add(float64(msgSizeVal)) 245 246 if m.Type != raftpb.MsgSnap { 247 if h.lg != nil { 248 h.lg.Warn( 249 "unexpected Raft message type", 250 zap.String("local-member-id", h.localID.String()), 251 zap.String("remote-snapshot-sender-id", from), 252 zap.String("message-type", m.Type.String()), 253 ) 254 } else { 255 plog.Errorf("unexpected raft message type %s on snapshot path", m.Type) 256 } 257 http.Error(w, "wrong raft message type", http.StatusBadRequest) 258 snapshotReceiveFailures.WithLabelValues(from).Inc() 259 return 260 } 261 262 snapshotReceiveInflights.WithLabelValues(from).Inc() 263 defer func() { 264 snapshotReceiveInflights.WithLabelValues(from).Dec() 265 }() 266 267 if h.lg != nil { 268 h.lg.Info( 269 "receiving database snapshot", 270 zap.String("local-member-id", h.localID.String()), 271 zap.String("remote-snapshot-sender-id", from), 272 zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index), 273 zap.Int("incoming-snapshot-message-size-bytes", msgSizeVal), 274 zap.String("incoming-snapshot-message-size", msgSize), 275 ) 276 } else { 277 plog.Infof("receiving database snapshot [index: %d, from: %s, raft message size: %s]", m.Snapshot.Metadata.Index, types.ID(m.From), msgSize) 278 } 279 280 // save incoming database snapshot. 281 n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index) 282 if err != nil { 283 msg := fmt.Sprintf("failed to save KV snapshot (%v)", err) 284 if h.lg != nil { 285 h.lg.Warn( 286 "failed to save incoming database snapshot", 287 zap.String("local-member-id", h.localID.String()), 288 zap.String("remote-snapshot-sender-id", from), 289 zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index), 290 zap.Error(err), 291 ) 292 } else { 293 plog.Error(msg) 294 } 295 http.Error(w, msg, http.StatusInternalServerError) 296 snapshotReceiveFailures.WithLabelValues(from).Inc() 297 return 298 } 299 300 dbSize := humanize.Bytes(uint64(n)) 301 receivedBytes.WithLabelValues(from).Add(float64(n)) 302 303 downloadTook := time.Since(start) 304 if h.lg != nil { 305 h.lg.Info( 306 "received and saved database snapshot", 307 zap.String("local-member-id", h.localID.String()), 308 zap.String("remote-snapshot-sender-id", from), 309 zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index), 310 zap.Int64("incoming-snapshot-size-bytes", n), 311 zap.String("incoming-snapshot-size", dbSize), 312 zap.String("download-took", downloadTook.String()), 313 ) 314 } else { 315 plog.Infof("successfully received and saved database snapshot [index: %d, from: %s, raft message size: %s, db size: %s, took: %s]", m.Snapshot.Metadata.Index, types.ID(m.From), msgSize, dbSize, downloadTook.String()) 316 } 317 318 if err := h.r.Process(context.TODO(), m); err != nil { 319 switch v := err.(type) { 320 // Process may return writerToResponse error when doing some 321 // additional checks before calling raft.Node.Step. 322 case writerToResponse: 323 v.WriteTo(w) 324 default: 325 msg := fmt.Sprintf("failed to process raft message (%v)", err) 326 if h.lg != nil { 327 h.lg.Warn( 328 "failed to process Raft message", 329 zap.String("local-member-id", h.localID.String()), 330 zap.String("remote-snapshot-sender-id", from), 331 zap.Error(err), 332 ) 333 } else { 334 plog.Error(msg) 335 } 336 http.Error(w, msg, http.StatusInternalServerError) 337 snapshotReceiveFailures.WithLabelValues(from).Inc() 338 } 339 return 340 } 341 342 // Write StatusNoContent header after the message has been processed by 343 // raft, which facilitates the client to report MsgSnap status. 344 w.WriteHeader(http.StatusNoContent) 345 346 snapshotReceive.WithLabelValues(from).Inc() 347 snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds()) 348} 349 350type streamHandler struct { 351 lg *zap.Logger 352 tr *Transport 353 peerGetter peerGetter 354 r Raft 355 id types.ID 356 cid types.ID 357} 358 359func newStreamHandler(t *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler { 360 return &streamHandler{ 361 lg: t.Logger, 362 tr: t, 363 peerGetter: pg, 364 r: r, 365 id: id, 366 cid: cid, 367 } 368} 369 370func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 371 if r.Method != "GET" { 372 w.Header().Set("Allow", "GET") 373 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) 374 return 375 } 376 377 w.Header().Set("X-Server-Version", version.Version) 378 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) 379 380 if err := checkClusterCompatibilityFromHeader(h.lg, h.tr.ID, r.Header, h.cid); err != nil { 381 http.Error(w, err.Error(), http.StatusPreconditionFailed) 382 return 383 } 384 385 var t streamType 386 switch path.Dir(r.URL.Path) { 387 case streamTypeMsgAppV2.endpoint(): 388 t = streamTypeMsgAppV2 389 case streamTypeMessage.endpoint(): 390 t = streamTypeMessage 391 default: 392 if h.lg != nil { 393 h.lg.Debug( 394 "ignored unexpected streaming request path", 395 zap.String("local-member-id", h.tr.ID.String()), 396 zap.String("remote-peer-id-stream-handler", h.id.String()), 397 zap.String("path", r.URL.Path), 398 ) 399 } else { 400 plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path) 401 } 402 http.Error(w, "invalid path", http.StatusNotFound) 403 return 404 } 405 406 fromStr := path.Base(r.URL.Path) 407 from, err := types.IDFromString(fromStr) 408 if err != nil { 409 if h.lg != nil { 410 h.lg.Warn( 411 "failed to parse path into ID", 412 zap.String("local-member-id", h.tr.ID.String()), 413 zap.String("remote-peer-id-stream-handler", h.id.String()), 414 zap.String("path", fromStr), 415 zap.Error(err), 416 ) 417 } else { 418 plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err) 419 } 420 http.Error(w, "invalid from", http.StatusNotFound) 421 return 422 } 423 if h.r.IsIDRemoved(uint64(from)) { 424 if h.lg != nil { 425 h.lg.Warn( 426 "rejected stream from remote peer because it was removed", 427 zap.String("local-member-id", h.tr.ID.String()), 428 zap.String("remote-peer-id-stream-handler", h.id.String()), 429 zap.String("remote-peer-id-from", from.String()), 430 ) 431 } else { 432 plog.Warningf("rejected the stream from peer %s since it was removed", from) 433 } 434 http.Error(w, "removed member", http.StatusGone) 435 return 436 } 437 p := h.peerGetter.Get(from) 438 if p == nil { 439 // This may happen in following cases: 440 // 1. user starts a remote peer that belongs to a different cluster 441 // with the same cluster ID. 442 // 2. local etcd falls behind of the cluster, and cannot recognize 443 // the members that joined after its current progress. 444 if urls := r.Header.Get("X-PeerURLs"); urls != "" { 445 h.tr.AddRemote(from, strings.Split(urls, ",")) 446 } 447 if h.lg != nil { 448 h.lg.Warn( 449 "failed to find remote peer in cluster", 450 zap.String("local-member-id", h.tr.ID.String()), 451 zap.String("remote-peer-id-stream-handler", h.id.String()), 452 zap.String("remote-peer-id-from", from.String()), 453 zap.String("cluster-id", h.cid.String()), 454 ) 455 } else { 456 plog.Errorf("failed to find member %s in cluster %s", from, h.cid) 457 } 458 http.Error(w, "error sender not found", http.StatusNotFound) 459 return 460 } 461 462 wto := h.id.String() 463 if gto := r.Header.Get("X-Raft-To"); gto != wto { 464 if h.lg != nil { 465 h.lg.Warn( 466 "ignored streaming request; ID mismatch", 467 zap.String("local-member-id", h.tr.ID.String()), 468 zap.String("remote-peer-id-stream-handler", h.id.String()), 469 zap.String("remote-peer-id-header", gto), 470 zap.String("remote-peer-id-from", from.String()), 471 zap.String("cluster-id", h.cid.String()), 472 ) 473 } else { 474 plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto) 475 } 476 http.Error(w, "to field mismatch", http.StatusPreconditionFailed) 477 return 478 } 479 480 w.WriteHeader(http.StatusOK) 481 w.(http.Flusher).Flush() 482 483 c := newCloseNotifier() 484 conn := &outgoingConn{ 485 t: t, 486 Writer: w, 487 Flusher: w.(http.Flusher), 488 Closer: c, 489 localID: h.tr.ID, 490 peerID: h.id, 491 } 492 p.attachOutgoingConn(conn) 493 <-c.closeNotify() 494} 495 496// checkClusterCompatibilityFromHeader checks the cluster compatibility of 497// the local member from the given header. 498// It checks whether the version of local member is compatible with 499// the versions in the header, and whether the cluster ID of local member 500// matches the one in the header. 501func checkClusterCompatibilityFromHeader(lg *zap.Logger, localID types.ID, header http.Header, cid types.ID) error { 502 remoteName := header.Get("X-Server-From") 503 504 remoteServer := serverVersion(header) 505 remoteVs := "" 506 if remoteServer != nil { 507 remoteVs = remoteServer.String() 508 } 509 510 remoteMinClusterVer := minClusterVersion(header) 511 remoteMinClusterVs := "" 512 if remoteMinClusterVer != nil { 513 remoteMinClusterVs = remoteMinClusterVer.String() 514 } 515 516 localServer, localMinCluster, err := checkVersionCompatibility(remoteName, remoteServer, remoteMinClusterVer) 517 518 localVs := "" 519 if localServer != nil { 520 localVs = localServer.String() 521 } 522 localMinClusterVs := "" 523 if localMinCluster != nil { 524 localMinClusterVs = localMinCluster.String() 525 } 526 527 if err != nil { 528 if lg != nil { 529 lg.Warn( 530 "failed to check version compatibility", 531 zap.String("local-member-id", localID.String()), 532 zap.String("local-member-cluster-id", cid.String()), 533 zap.String("local-member-server-version", localVs), 534 zap.String("local-member-server-minimum-cluster-version", localMinClusterVs), 535 zap.String("remote-peer-server-name", remoteName), 536 zap.String("remote-peer-server-version", remoteVs), 537 zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs), 538 zap.Error(err), 539 ) 540 } else { 541 plog.Errorf("request version incompatibility (%v)", err) 542 } 543 return errIncompatibleVersion 544 } 545 if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() { 546 if lg != nil { 547 lg.Warn( 548 "request cluster ID mismatch", 549 zap.String("local-member-id", localID.String()), 550 zap.String("local-member-cluster-id", cid.String()), 551 zap.String("local-member-server-version", localVs), 552 zap.String("local-member-server-minimum-cluster-version", localMinClusterVs), 553 zap.String("remote-peer-server-name", remoteName), 554 zap.String("remote-peer-server-version", remoteVs), 555 zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs), 556 zap.String("remote-peer-cluster-id", gcid), 557 ) 558 } else { 559 plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid) 560 } 561 return errClusterIDMismatch 562 } 563 return nil 564} 565 566type closeNotifier struct { 567 done chan struct{} 568} 569 570func newCloseNotifier() *closeNotifier { 571 return &closeNotifier{ 572 done: make(chan struct{}), 573 } 574} 575 576func (n *closeNotifier) Close() error { 577 close(n.done) 578 return nil 579} 580 581func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done } 582