1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "context" 19 "fmt" 20 "io" 21 "io/ioutil" 22 "net/http" 23 "path" 24 "strings" 25 "sync" 26 "time" 27 28 stats "go.etcd.io/etcd/etcdserver/api/v2stats" 29 "go.etcd.io/etcd/pkg/httputil" 30 "go.etcd.io/etcd/pkg/transport" 31 "go.etcd.io/etcd/pkg/types" 32 "go.etcd.io/etcd/raft/raftpb" 33 "go.etcd.io/etcd/version" 34 35 "github.com/coreos/go-semver/semver" 36 "go.uber.org/zap" 37 "golang.org/x/time/rate" 38) 39 40const ( 41 streamTypeMessage streamType = "message" 42 streamTypeMsgAppV2 streamType = "msgappv2" 43 44 streamBufSize = 4096 45) 46 47var ( 48 errUnsupportedStreamType = fmt.Errorf("unsupported stream type") 49 50 // the key is in string format "major.minor.patch" 51 supportedStream = map[string][]streamType{ 52 "2.0.0": {}, 53 "2.1.0": {streamTypeMsgAppV2, streamTypeMessage}, 54 "2.2.0": {streamTypeMsgAppV2, streamTypeMessage}, 55 "2.3.0": {streamTypeMsgAppV2, streamTypeMessage}, 56 "3.0.0": {streamTypeMsgAppV2, streamTypeMessage}, 57 "3.1.0": {streamTypeMsgAppV2, streamTypeMessage}, 58 "3.2.0": {streamTypeMsgAppV2, streamTypeMessage}, 59 "3.3.0": {streamTypeMsgAppV2, streamTypeMessage}, 60 "3.4.0": {streamTypeMsgAppV2, streamTypeMessage}, 61 } 62) 63 64type streamType string 65 66func (t streamType) endpoint() string { 67 switch t { 68 case streamTypeMsgAppV2: 69 return path.Join(RaftStreamPrefix, "msgapp") 70 case streamTypeMessage: 71 return path.Join(RaftStreamPrefix, "message") 72 default: 73 plog.Panicf("unhandled stream type %v", t) 74 return "" 75 } 76} 77 78func (t streamType) String() string { 79 switch t { 80 case streamTypeMsgAppV2: 81 return "stream MsgApp v2" 82 case streamTypeMessage: 83 return "stream Message" 84 default: 85 return "unknown stream" 86 } 87} 88 89var ( 90 // linkHeartbeatMessage is a special message used as heartbeat message in 91 // link layer. It never conflicts with messages from raft because raft 92 // doesn't send out messages without From and To fields. 93 linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat} 94) 95 96func isLinkHeartbeatMessage(m *raftpb.Message) bool { 97 return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0 98} 99 100type outgoingConn struct { 101 t streamType 102 io.Writer 103 http.Flusher 104 io.Closer 105 106 localID types.ID 107 peerID types.ID 108} 109 110// streamWriter writes messages to the attached outgoingConn. 111type streamWriter struct { 112 lg *zap.Logger 113 114 localID types.ID 115 peerID types.ID 116 117 status *peerStatus 118 fs *stats.FollowerStats 119 r Raft 120 121 mu sync.Mutex // guard field working and closer 122 closer io.Closer 123 working bool 124 125 msgc chan raftpb.Message 126 connc chan *outgoingConn 127 stopc chan struct{} 128 done chan struct{} 129} 130 131// startStreamWriter creates a streamWrite and starts a long running go-routine that accepts 132// messages and writes to the attached outgoing connection. 133func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter { 134 w := &streamWriter{ 135 lg: lg, 136 137 localID: local, 138 peerID: id, 139 140 status: status, 141 fs: fs, 142 r: r, 143 msgc: make(chan raftpb.Message, streamBufSize), 144 connc: make(chan *outgoingConn), 145 stopc: make(chan struct{}), 146 done: make(chan struct{}), 147 } 148 go w.run() 149 return w 150} 151 152func (cw *streamWriter) run() { 153 var ( 154 msgc chan raftpb.Message 155 heartbeatc <-chan time.Time 156 t streamType 157 enc encoder 158 flusher http.Flusher 159 batched int 160 ) 161 tickc := time.NewTicker(ConnReadTimeout / 3) 162 defer tickc.Stop() 163 unflushed := 0 164 165 if cw.lg != nil { 166 cw.lg.Info( 167 "started stream writer with remote peer", 168 zap.String("local-member-id", cw.localID.String()), 169 zap.String("remote-peer-id", cw.peerID.String()), 170 ) 171 } else { 172 plog.Infof("started streaming with peer %s (writer)", cw.peerID) 173 } 174 175 for { 176 select { 177 case <-heartbeatc: 178 err := enc.encode(&linkHeartbeatMessage) 179 unflushed += linkHeartbeatMessage.Size() 180 if err == nil { 181 flusher.Flush() 182 batched = 0 183 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed)) 184 unflushed = 0 185 continue 186 } 187 188 cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) 189 190 sentFailures.WithLabelValues(cw.peerID.String()).Inc() 191 cw.close() 192 if cw.lg != nil { 193 cw.lg.Warn( 194 "lost TCP streaming connection with remote peer", 195 zap.String("stream-writer-type", t.String()), 196 zap.String("local-member-id", cw.localID.String()), 197 zap.String("remote-peer-id", cw.peerID.String()), 198 ) 199 } else { 200 plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) 201 } 202 heartbeatc, msgc = nil, nil 203 204 case m := <-msgc: 205 err := enc.encode(&m) 206 if err == nil { 207 unflushed += m.Size() 208 209 if len(msgc) == 0 || batched > streamBufSize/2 { 210 flusher.Flush() 211 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed)) 212 unflushed = 0 213 batched = 0 214 } else { 215 batched++ 216 } 217 218 continue 219 } 220 221 cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) 222 cw.close() 223 if cw.lg != nil { 224 cw.lg.Warn( 225 "lost TCP streaming connection with remote peer", 226 zap.String("stream-writer-type", t.String()), 227 zap.String("local-member-id", cw.localID.String()), 228 zap.String("remote-peer-id", cw.peerID.String()), 229 ) 230 } else { 231 plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) 232 } 233 heartbeatc, msgc = nil, nil 234 cw.r.ReportUnreachable(m.To) 235 sentFailures.WithLabelValues(cw.peerID.String()).Inc() 236 237 case conn := <-cw.connc: 238 cw.mu.Lock() 239 closed := cw.closeUnlocked() 240 t = conn.t 241 switch conn.t { 242 case streamTypeMsgAppV2: 243 enc = newMsgAppV2Encoder(conn.Writer, cw.fs) 244 case streamTypeMessage: 245 enc = &messageEncoder{w: conn.Writer} 246 default: 247 plog.Panicf("unhandled stream type %s", conn.t) 248 } 249 if cw.lg != nil { 250 cw.lg.Info( 251 "set message encoder", 252 zap.String("from", conn.localID.String()), 253 zap.String("to", conn.peerID.String()), 254 zap.String("stream-type", t.String()), 255 ) 256 } 257 flusher = conn.Flusher 258 unflushed = 0 259 cw.status.activate() 260 cw.closer = conn.Closer 261 cw.working = true 262 cw.mu.Unlock() 263 264 if closed { 265 if cw.lg != nil { 266 cw.lg.Warn( 267 "closed TCP streaming connection with remote peer", 268 zap.String("stream-writer-type", t.String()), 269 zap.String("local-member-id", cw.localID.String()), 270 zap.String("remote-peer-id", cw.peerID.String()), 271 ) 272 } else { 273 plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t) 274 } 275 } 276 if cw.lg != nil { 277 cw.lg.Warn( 278 "established TCP streaming connection with remote peer", 279 zap.String("stream-writer-type", t.String()), 280 zap.String("local-member-id", cw.localID.String()), 281 zap.String("remote-peer-id", cw.peerID.String()), 282 ) 283 } else { 284 plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t) 285 } 286 heartbeatc, msgc = tickc.C, cw.msgc 287 288 case <-cw.stopc: 289 if cw.close() { 290 if cw.lg != nil { 291 cw.lg.Warn( 292 "closed TCP streaming connection with remote peer", 293 zap.String("stream-writer-type", t.String()), 294 zap.String("remote-peer-id", cw.peerID.String()), 295 ) 296 } else { 297 plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) 298 } 299 } 300 if cw.lg != nil { 301 cw.lg.Warn( 302 "stopped TCP streaming connection with remote peer", 303 zap.String("stream-writer-type", t.String()), 304 zap.String("remote-peer-id", cw.peerID.String()), 305 ) 306 } else { 307 plog.Infof("stopped streaming with peer %s (writer)", cw.peerID) 308 } 309 close(cw.done) 310 return 311 } 312 } 313} 314 315func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) { 316 cw.mu.Lock() 317 defer cw.mu.Unlock() 318 return cw.msgc, cw.working 319} 320 321func (cw *streamWriter) close() bool { 322 cw.mu.Lock() 323 defer cw.mu.Unlock() 324 return cw.closeUnlocked() 325} 326 327func (cw *streamWriter) closeUnlocked() bool { 328 if !cw.working { 329 return false 330 } 331 if err := cw.closer.Close(); err != nil { 332 if cw.lg != nil { 333 cw.lg.Warn( 334 "failed to close connection with remote peer", 335 zap.String("remote-peer-id", cw.peerID.String()), 336 zap.Error(err), 337 ) 338 } else { 339 plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err) 340 } 341 } 342 if len(cw.msgc) > 0 { 343 cw.r.ReportUnreachable(uint64(cw.peerID)) 344 } 345 cw.msgc = make(chan raftpb.Message, streamBufSize) 346 cw.working = false 347 return true 348} 349 350func (cw *streamWriter) attach(conn *outgoingConn) bool { 351 select { 352 case cw.connc <- conn: 353 return true 354 case <-cw.done: 355 return false 356 } 357} 358 359func (cw *streamWriter) stop() { 360 close(cw.stopc) 361 <-cw.done 362} 363 364// streamReader is a long-running go-routine that dials to the remote stream 365// endpoint and reads messages from the response body returned. 366type streamReader struct { 367 lg *zap.Logger 368 369 peerID types.ID 370 typ streamType 371 372 tr *Transport 373 picker *urlPicker 374 status *peerStatus 375 recvc chan<- raftpb.Message 376 propc chan<- raftpb.Message 377 378 rl *rate.Limiter // alters the frequency of dial retrial attempts 379 380 errorc chan<- error 381 382 mu sync.Mutex 383 paused bool 384 closer io.Closer 385 386 ctx context.Context 387 cancel context.CancelFunc 388 done chan struct{} 389} 390 391func (cr *streamReader) start() { 392 cr.done = make(chan struct{}) 393 if cr.errorc == nil { 394 cr.errorc = cr.tr.ErrorC 395 } 396 if cr.ctx == nil { 397 cr.ctx, cr.cancel = context.WithCancel(context.Background()) 398 } 399 go cr.run() 400} 401 402func (cr *streamReader) run() { 403 t := cr.typ 404 405 if cr.lg != nil { 406 cr.lg.Info( 407 "started stream reader with remote peer", 408 zap.String("stream-reader-type", t.String()), 409 zap.String("local-member-id", cr.tr.ID.String()), 410 zap.String("remote-peer-id", cr.peerID.String()), 411 ) 412 } else { 413 plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t) 414 } 415 416 for { 417 rc, err := cr.dial(t) 418 if err != nil { 419 if err != errUnsupportedStreamType { 420 cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error()) 421 } 422 } else { 423 cr.status.activate() 424 if cr.lg != nil { 425 cr.lg.Info( 426 "established TCP streaming connection with remote peer", 427 zap.String("stream-reader-type", cr.typ.String()), 428 zap.String("local-member-id", cr.tr.ID.String()), 429 zap.String("remote-peer-id", cr.peerID.String()), 430 ) 431 } else { 432 plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) 433 } 434 err = cr.decodeLoop(rc, t) 435 if cr.lg != nil { 436 cr.lg.Warn( 437 "lost TCP streaming connection with remote peer", 438 zap.String("stream-reader-type", cr.typ.String()), 439 zap.String("local-member-id", cr.tr.ID.String()), 440 zap.String("remote-peer-id", cr.peerID.String()), 441 zap.Error(err), 442 ) 443 } else { 444 plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) 445 } 446 switch { 447 // all data is read out 448 case err == io.EOF: 449 // connection is closed by the remote 450 case transport.IsClosedConnError(err): 451 default: 452 cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error()) 453 } 454 } 455 // Wait for a while before new dial attempt 456 err = cr.rl.Wait(cr.ctx) 457 if cr.ctx.Err() != nil { 458 if cr.lg != nil { 459 cr.lg.Info( 460 "stopped stream reader with remote peer", 461 zap.String("stream-reader-type", t.String()), 462 zap.String("local-member-id", cr.tr.ID.String()), 463 zap.String("remote-peer-id", cr.peerID.String()), 464 ) 465 } else { 466 plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t) 467 } 468 close(cr.done) 469 return 470 } 471 if err != nil { 472 if cr.lg != nil { 473 cr.lg.Warn( 474 "rate limit on stream reader with remote peer", 475 zap.String("stream-reader-type", t.String()), 476 zap.String("local-member-id", cr.tr.ID.String()), 477 zap.String("remote-peer-id", cr.peerID.String()), 478 zap.Error(err), 479 ) 480 } else { 481 plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err) 482 } 483 } 484 } 485} 486 487func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { 488 var dec decoder 489 cr.mu.Lock() 490 switch t { 491 case streamTypeMsgAppV2: 492 dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID) 493 case streamTypeMessage: 494 dec = &messageDecoder{r: rc} 495 default: 496 if cr.lg != nil { 497 cr.lg.Panic("unknown stream type", zap.String("type", t.String())) 498 } else { 499 plog.Panicf("unhandled stream type %s", t) 500 } 501 } 502 select { 503 case <-cr.ctx.Done(): 504 cr.mu.Unlock() 505 if err := rc.Close(); err != nil { 506 return err 507 } 508 return io.EOF 509 default: 510 cr.closer = rc 511 } 512 cr.mu.Unlock() 513 514 // gofail: labelRaftDropHeartbeat: 515 for { 516 m, err := dec.decode() 517 if err != nil { 518 cr.mu.Lock() 519 cr.close() 520 cr.mu.Unlock() 521 return err 522 } 523 524 // gofail-go: var raftDropHeartbeat struct{} 525 // continue labelRaftDropHeartbeat 526 receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size())) 527 528 cr.mu.Lock() 529 paused := cr.paused 530 cr.mu.Unlock() 531 532 if paused { 533 continue 534 } 535 536 if isLinkHeartbeatMessage(&m) { 537 // raft is not interested in link layer 538 // heartbeat message, so we should ignore 539 // it. 540 continue 541 } 542 543 recvc := cr.recvc 544 if m.Type == raftpb.MsgProp { 545 recvc = cr.propc 546 } 547 548 select { 549 case recvc <- m: 550 default: 551 if cr.status.isActive() { 552 if cr.lg != nil { 553 cr.lg.Warn( 554 "dropped internal Raft message since receiving buffer is full (overloaded network)", 555 zap.String("message-type", m.Type.String()), 556 zap.String("local-member-id", cr.tr.ID.String()), 557 zap.String("from", types.ID(m.From).String()), 558 zap.String("remote-peer-id", types.ID(m.To).String()), 559 zap.Bool("remote-peer-active", cr.status.isActive()), 560 ) 561 } else { 562 plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From)) 563 } 564 } else { 565 if cr.lg != nil { 566 cr.lg.Warn( 567 "dropped Raft message since receiving buffer is full (overloaded network)", 568 zap.String("message-type", m.Type.String()), 569 zap.String("local-member-id", cr.tr.ID.String()), 570 zap.String("from", types.ID(m.From).String()), 571 zap.String("remote-peer-id", types.ID(m.To).String()), 572 zap.Bool("remote-peer-active", cr.status.isActive()), 573 ) 574 } else { 575 plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) 576 } 577 } 578 recvFailures.WithLabelValues(types.ID(m.From).String()).Inc() 579 } 580 } 581} 582 583func (cr *streamReader) stop() { 584 cr.mu.Lock() 585 cr.cancel() 586 cr.close() 587 cr.mu.Unlock() 588 <-cr.done 589} 590 591func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { 592 u := cr.picker.pick() 593 uu := u 594 uu.Path = path.Join(t.endpoint(), cr.tr.ID.String()) 595 596 if cr.lg != nil { 597 cr.lg.Debug( 598 "dial stream reader", 599 zap.String("from", cr.tr.ID.String()), 600 zap.String("to", cr.peerID.String()), 601 zap.String("address", uu.String()), 602 ) 603 } 604 req, err := http.NewRequest("GET", uu.String(), nil) 605 if err != nil { 606 cr.picker.unreachable(u) 607 return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err) 608 } 609 req.Header.Set("X-Server-From", cr.tr.ID.String()) 610 req.Header.Set("X-Server-Version", version.Version) 611 req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion) 612 req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String()) 613 req.Header.Set("X-Raft-To", cr.peerID.String()) 614 615 setPeerURLsHeader(req, cr.tr.URLs) 616 617 req = req.WithContext(cr.ctx) 618 619 cr.mu.Lock() 620 select { 621 case <-cr.ctx.Done(): 622 cr.mu.Unlock() 623 return nil, fmt.Errorf("stream reader is stopped") 624 default: 625 } 626 cr.mu.Unlock() 627 628 resp, err := cr.tr.streamRt.RoundTrip(req) 629 if err != nil { 630 cr.picker.unreachable(u) 631 return nil, err 632 } 633 634 rv := serverVersion(resp.Header) 635 lv := semver.Must(semver.NewVersion(version.Version)) 636 if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) { 637 httputil.GracefulClose(resp) 638 cr.picker.unreachable(u) 639 return nil, errUnsupportedStreamType 640 } 641 642 switch resp.StatusCode { 643 case http.StatusGone: 644 httputil.GracefulClose(resp) 645 cr.picker.unreachable(u) 646 reportCriticalError(errMemberRemoved, cr.errorc) 647 return nil, errMemberRemoved 648 649 case http.StatusOK: 650 return resp.Body, nil 651 652 case http.StatusNotFound: 653 httputil.GracefulClose(resp) 654 cr.picker.unreachable(u) 655 return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID) 656 657 case http.StatusPreconditionFailed: 658 b, err := ioutil.ReadAll(resp.Body) 659 if err != nil { 660 cr.picker.unreachable(u) 661 return nil, err 662 } 663 httputil.GracefulClose(resp) 664 cr.picker.unreachable(u) 665 666 switch strings.TrimSuffix(string(b), "\n") { 667 case errIncompatibleVersion.Error(): 668 if cr.lg != nil { 669 cr.lg.Warn( 670 "request sent was ignored by remote peer due to server version incompatibility", 671 zap.String("local-member-id", cr.tr.ID.String()), 672 zap.String("remote-peer-id", cr.peerID.String()), 673 zap.Error(errIncompatibleVersion), 674 ) 675 } else { 676 plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID) 677 } 678 return nil, errIncompatibleVersion 679 680 case errClusterIDMismatch.Error(): 681 if cr.lg != nil { 682 cr.lg.Warn( 683 "request sent was ignored by remote peer due to cluster ID mismatch", 684 zap.String("remote-peer-id", cr.peerID.String()), 685 zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")), 686 zap.String("local-member-id", cr.tr.ID.String()), 687 zap.String("local-member-cluster-id", cr.tr.ClusterID.String()), 688 zap.Error(errClusterIDMismatch), 689 ) 690 } else { 691 plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)", 692 cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID) 693 } 694 return nil, errClusterIDMismatch 695 696 default: 697 return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b)) 698 } 699 700 default: 701 httputil.GracefulClose(resp) 702 cr.picker.unreachable(u) 703 return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode) 704 } 705} 706 707func (cr *streamReader) close() { 708 if cr.closer != nil { 709 if err := cr.closer.Close(); err != nil { 710 if cr.lg != nil { 711 cr.lg.Warn( 712 "failed to close remote peer connection", 713 zap.String("local-member-id", cr.tr.ID.String()), 714 zap.String("remote-peer-id", cr.peerID.String()), 715 zap.Error(err), 716 ) 717 } else { 718 plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err) 719 } 720 } 721 } 722 cr.closer = nil 723} 724 725func (cr *streamReader) pause() { 726 cr.mu.Lock() 727 defer cr.mu.Unlock() 728 cr.paused = true 729} 730 731func (cr *streamReader) resume() { 732 cr.mu.Lock() 733 defer cr.mu.Unlock() 734 cr.paused = false 735} 736 737// checkStreamSupport checks whether the stream type is supported in the 738// given version. 739func checkStreamSupport(v *semver.Version, t streamType) bool { 740 nv := &semver.Version{Major: v.Major, Minor: v.Minor} 741 for _, s := range supportedStream[nv.String()] { 742 if s == t { 743 return true 744 } 745 } 746 return false 747} 748