1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "context" 19 "fmt" 20 "io" 21 "io/ioutil" 22 "net/http" 23 "path" 24 "strings" 25 "sync" 26 "time" 27 28 "go.etcd.io/etcd/api/v3/version" 29 "go.etcd.io/etcd/client/pkg/v3/transport" 30 "go.etcd.io/etcd/client/pkg/v3/types" 31 "go.etcd.io/etcd/pkg/v3/httputil" 32 "go.etcd.io/etcd/raft/v3/raftpb" 33 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats" 34 35 "github.com/coreos/go-semver/semver" 36 "go.uber.org/zap" 37 "golang.org/x/time/rate" 38) 39 40const ( 41 streamTypeMessage streamType = "message" 42 streamTypeMsgAppV2 streamType = "msgappv2" 43 44 streamBufSize = 4096 45) 46 47var ( 48 errUnsupportedStreamType = fmt.Errorf("unsupported stream type") 49 50 // the key is in string format "major.minor.patch" 51 supportedStream = map[string][]streamType{ 52 "2.0.0": {}, 53 "2.1.0": {streamTypeMsgAppV2, streamTypeMessage}, 54 "2.2.0": {streamTypeMsgAppV2, streamTypeMessage}, 55 "2.3.0": {streamTypeMsgAppV2, streamTypeMessage}, 56 "3.0.0": {streamTypeMsgAppV2, streamTypeMessage}, 57 "3.1.0": {streamTypeMsgAppV2, streamTypeMessage}, 58 "3.2.0": {streamTypeMsgAppV2, streamTypeMessage}, 59 "3.3.0": {streamTypeMsgAppV2, streamTypeMessage}, 60 "3.4.0": {streamTypeMsgAppV2, streamTypeMessage}, 61 "3.5.0": {streamTypeMsgAppV2, streamTypeMessage}, 62 } 63) 64 65type streamType string 66 67func (t streamType) endpoint(lg *zap.Logger) string { 68 switch t { 69 case streamTypeMsgAppV2: 70 return path.Join(RaftStreamPrefix, "msgapp") 71 case streamTypeMessage: 72 return path.Join(RaftStreamPrefix, "message") 73 default: 74 if lg != nil { 75 lg.Panic("unhandled stream type", zap.String("stream-type", t.String())) 76 } 77 return "" 78 } 79} 80 81func (t streamType) String() string { 82 switch t { 83 case streamTypeMsgAppV2: 84 return "stream MsgApp v2" 85 case streamTypeMessage: 86 return "stream Message" 87 default: 88 return "unknown stream" 89 } 90} 91 92var ( 93 // linkHeartbeatMessage is a special message used as heartbeat message in 94 // link layer. It never conflicts with messages from raft because raft 95 // doesn't send out messages without From and To fields. 96 linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat} 97) 98 99func isLinkHeartbeatMessage(m *raftpb.Message) bool { 100 return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0 101} 102 103type outgoingConn struct { 104 t streamType 105 io.Writer 106 http.Flusher 107 io.Closer 108 109 localID types.ID 110 peerID types.ID 111} 112 113// streamWriter writes messages to the attached outgoingConn. 114type streamWriter struct { 115 lg *zap.Logger 116 117 localID types.ID 118 peerID types.ID 119 120 status *peerStatus 121 fs *stats.FollowerStats 122 r Raft 123 124 mu sync.Mutex // guard field working and closer 125 closer io.Closer 126 working bool 127 128 msgc chan raftpb.Message 129 connc chan *outgoingConn 130 stopc chan struct{} 131 done chan struct{} 132} 133 134// startStreamWriter creates a streamWrite and starts a long running go-routine that accepts 135// messages and writes to the attached outgoing connection. 136func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter { 137 w := &streamWriter{ 138 lg: lg, 139 140 localID: local, 141 peerID: id, 142 143 status: status, 144 fs: fs, 145 r: r, 146 msgc: make(chan raftpb.Message, streamBufSize), 147 connc: make(chan *outgoingConn), 148 stopc: make(chan struct{}), 149 done: make(chan struct{}), 150 } 151 go w.run() 152 return w 153} 154 155func (cw *streamWriter) run() { 156 var ( 157 msgc chan raftpb.Message 158 heartbeatc <-chan time.Time 159 t streamType 160 enc encoder 161 flusher http.Flusher 162 batched int 163 ) 164 tickc := time.NewTicker(ConnReadTimeout / 3) 165 defer tickc.Stop() 166 unflushed := 0 167 168 if cw.lg != nil { 169 cw.lg.Info( 170 "started stream writer with remote peer", 171 zap.String("local-member-id", cw.localID.String()), 172 zap.String("remote-peer-id", cw.peerID.String()), 173 ) 174 } 175 176 for { 177 select { 178 case <-heartbeatc: 179 err := enc.encode(&linkHeartbeatMessage) 180 unflushed += linkHeartbeatMessage.Size() 181 if err == nil { 182 flusher.Flush() 183 batched = 0 184 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed)) 185 unflushed = 0 186 continue 187 } 188 189 cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) 190 191 sentFailures.WithLabelValues(cw.peerID.String()).Inc() 192 cw.close() 193 if cw.lg != nil { 194 cw.lg.Warn( 195 "lost TCP streaming connection with remote peer", 196 zap.String("stream-writer-type", t.String()), 197 zap.String("local-member-id", cw.localID.String()), 198 zap.String("remote-peer-id", cw.peerID.String()), 199 ) 200 } 201 heartbeatc, msgc = nil, nil 202 203 case m := <-msgc: 204 err := enc.encode(&m) 205 if err == nil { 206 unflushed += m.Size() 207 208 if len(msgc) == 0 || batched > streamBufSize/2 { 209 flusher.Flush() 210 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed)) 211 unflushed = 0 212 batched = 0 213 } else { 214 batched++ 215 } 216 217 continue 218 } 219 220 cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) 221 cw.close() 222 if cw.lg != nil { 223 cw.lg.Warn( 224 "lost TCP streaming connection with remote peer", 225 zap.String("stream-writer-type", t.String()), 226 zap.String("local-member-id", cw.localID.String()), 227 zap.String("remote-peer-id", cw.peerID.String()), 228 ) 229 } 230 heartbeatc, msgc = nil, nil 231 cw.r.ReportUnreachable(m.To) 232 sentFailures.WithLabelValues(cw.peerID.String()).Inc() 233 234 case conn := <-cw.connc: 235 cw.mu.Lock() 236 closed := cw.closeUnlocked() 237 t = conn.t 238 switch conn.t { 239 case streamTypeMsgAppV2: 240 enc = newMsgAppV2Encoder(conn.Writer, cw.fs) 241 case streamTypeMessage: 242 enc = &messageEncoder{w: conn.Writer} 243 default: 244 if cw.lg != nil { 245 cw.lg.Panic("unhandled stream type", zap.String("stream-type", t.String())) 246 } 247 } 248 if cw.lg != nil { 249 cw.lg.Info( 250 "set message encoder", 251 zap.String("from", conn.localID.String()), 252 zap.String("to", conn.peerID.String()), 253 zap.String("stream-type", t.String()), 254 ) 255 } 256 flusher = conn.Flusher 257 unflushed = 0 258 cw.status.activate() 259 cw.closer = conn.Closer 260 cw.working = true 261 cw.mu.Unlock() 262 263 if closed { 264 if cw.lg != nil { 265 cw.lg.Warn( 266 "closed TCP streaming connection with remote peer", 267 zap.String("stream-writer-type", t.String()), 268 zap.String("local-member-id", cw.localID.String()), 269 zap.String("remote-peer-id", cw.peerID.String()), 270 ) 271 } 272 } 273 if cw.lg != nil { 274 cw.lg.Info( 275 "established TCP streaming connection with remote peer", 276 zap.String("stream-writer-type", t.String()), 277 zap.String("local-member-id", cw.localID.String()), 278 zap.String("remote-peer-id", cw.peerID.String()), 279 ) 280 } 281 heartbeatc, msgc = tickc.C, cw.msgc 282 283 case <-cw.stopc: 284 if cw.close() { 285 if cw.lg != nil { 286 cw.lg.Warn( 287 "closed TCP streaming connection with remote peer", 288 zap.String("stream-writer-type", t.String()), 289 zap.String("remote-peer-id", cw.peerID.String()), 290 ) 291 } 292 } 293 if cw.lg != nil { 294 cw.lg.Info( 295 "stopped TCP streaming connection with remote peer", 296 zap.String("stream-writer-type", t.String()), 297 zap.String("remote-peer-id", cw.peerID.String()), 298 ) 299 } 300 close(cw.done) 301 return 302 } 303 } 304} 305 306func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) { 307 cw.mu.Lock() 308 defer cw.mu.Unlock() 309 return cw.msgc, cw.working 310} 311 312func (cw *streamWriter) close() bool { 313 cw.mu.Lock() 314 defer cw.mu.Unlock() 315 return cw.closeUnlocked() 316} 317 318func (cw *streamWriter) closeUnlocked() bool { 319 if !cw.working { 320 return false 321 } 322 if err := cw.closer.Close(); err != nil { 323 if cw.lg != nil { 324 cw.lg.Warn( 325 "failed to close connection with remote peer", 326 zap.String("remote-peer-id", cw.peerID.String()), 327 zap.Error(err), 328 ) 329 } 330 } 331 if len(cw.msgc) > 0 { 332 cw.r.ReportUnreachable(uint64(cw.peerID)) 333 } 334 cw.msgc = make(chan raftpb.Message, streamBufSize) 335 cw.working = false 336 return true 337} 338 339func (cw *streamWriter) attach(conn *outgoingConn) bool { 340 select { 341 case cw.connc <- conn: 342 return true 343 case <-cw.done: 344 return false 345 } 346} 347 348func (cw *streamWriter) stop() { 349 close(cw.stopc) 350 <-cw.done 351} 352 353// streamReader is a long-running go-routine that dials to the remote stream 354// endpoint and reads messages from the response body returned. 355type streamReader struct { 356 lg *zap.Logger 357 358 peerID types.ID 359 typ streamType 360 361 tr *Transport 362 picker *urlPicker 363 status *peerStatus 364 recvc chan<- raftpb.Message 365 propc chan<- raftpb.Message 366 367 rl *rate.Limiter // alters the frequency of dial retrial attempts 368 369 errorc chan<- error 370 371 mu sync.Mutex 372 paused bool 373 closer io.Closer 374 375 ctx context.Context 376 cancel context.CancelFunc 377 done chan struct{} 378} 379 380func (cr *streamReader) start() { 381 cr.done = make(chan struct{}) 382 if cr.errorc == nil { 383 cr.errorc = cr.tr.ErrorC 384 } 385 if cr.ctx == nil { 386 cr.ctx, cr.cancel = context.WithCancel(context.Background()) 387 } 388 go cr.run() 389} 390 391func (cr *streamReader) run() { 392 t := cr.typ 393 394 if cr.lg != nil { 395 cr.lg.Info( 396 "started stream reader with remote peer", 397 zap.String("stream-reader-type", t.String()), 398 zap.String("local-member-id", cr.tr.ID.String()), 399 zap.String("remote-peer-id", cr.peerID.String()), 400 ) 401 } 402 403 for { 404 rc, err := cr.dial(t) 405 if err != nil { 406 if err != errUnsupportedStreamType { 407 cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error()) 408 } 409 } else { 410 cr.status.activate() 411 if cr.lg != nil { 412 cr.lg.Info( 413 "established TCP streaming connection with remote peer", 414 zap.String("stream-reader-type", cr.typ.String()), 415 zap.String("local-member-id", cr.tr.ID.String()), 416 zap.String("remote-peer-id", cr.peerID.String()), 417 ) 418 } 419 err = cr.decodeLoop(rc, t) 420 if cr.lg != nil { 421 cr.lg.Warn( 422 "lost TCP streaming connection with remote peer", 423 zap.String("stream-reader-type", cr.typ.String()), 424 zap.String("local-member-id", cr.tr.ID.String()), 425 zap.String("remote-peer-id", cr.peerID.String()), 426 zap.Error(err), 427 ) 428 } 429 switch { 430 // all data is read out 431 case err == io.EOF: 432 // connection is closed by the remote 433 case transport.IsClosedConnError(err): 434 default: 435 cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error()) 436 } 437 } 438 // Wait for a while before new dial attempt 439 err = cr.rl.Wait(cr.ctx) 440 if cr.ctx.Err() != nil { 441 if cr.lg != nil { 442 cr.lg.Info( 443 "stopped stream reader with remote peer", 444 zap.String("stream-reader-type", t.String()), 445 zap.String("local-member-id", cr.tr.ID.String()), 446 zap.String("remote-peer-id", cr.peerID.String()), 447 ) 448 } 449 close(cr.done) 450 return 451 } 452 if err != nil { 453 if cr.lg != nil { 454 cr.lg.Warn( 455 "rate limit on stream reader with remote peer", 456 zap.String("stream-reader-type", t.String()), 457 zap.String("local-member-id", cr.tr.ID.String()), 458 zap.String("remote-peer-id", cr.peerID.String()), 459 zap.Error(err), 460 ) 461 } 462 } 463 } 464} 465 466func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { 467 var dec decoder 468 cr.mu.Lock() 469 switch t { 470 case streamTypeMsgAppV2: 471 dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID) 472 case streamTypeMessage: 473 dec = &messageDecoder{r: rc} 474 default: 475 if cr.lg != nil { 476 cr.lg.Panic("unknown stream type", zap.String("type", t.String())) 477 } 478 } 479 select { 480 case <-cr.ctx.Done(): 481 cr.mu.Unlock() 482 if err := rc.Close(); err != nil { 483 return err 484 } 485 return io.EOF 486 default: 487 cr.closer = rc 488 } 489 cr.mu.Unlock() 490 491 // gofail: labelRaftDropHeartbeat: 492 for { 493 m, err := dec.decode() 494 if err != nil { 495 cr.mu.Lock() 496 cr.close() 497 cr.mu.Unlock() 498 return err 499 } 500 501 // gofail-go: var raftDropHeartbeat struct{} 502 // continue labelRaftDropHeartbeat 503 receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size())) 504 505 cr.mu.Lock() 506 paused := cr.paused 507 cr.mu.Unlock() 508 509 if paused { 510 continue 511 } 512 513 if isLinkHeartbeatMessage(&m) { 514 // raft is not interested in link layer 515 // heartbeat message, so we should ignore 516 // it. 517 continue 518 } 519 520 recvc := cr.recvc 521 if m.Type == raftpb.MsgProp { 522 recvc = cr.propc 523 } 524 525 select { 526 case recvc <- m: 527 default: 528 if cr.status.isActive() { 529 if cr.lg != nil { 530 cr.lg.Warn( 531 "dropped internal Raft message since receiving buffer is full (overloaded network)", 532 zap.String("message-type", m.Type.String()), 533 zap.String("local-member-id", cr.tr.ID.String()), 534 zap.String("from", types.ID(m.From).String()), 535 zap.String("remote-peer-id", types.ID(m.To).String()), 536 zap.Bool("remote-peer-active", cr.status.isActive()), 537 ) 538 } 539 } else { 540 if cr.lg != nil { 541 cr.lg.Warn( 542 "dropped Raft message since receiving buffer is full (overloaded network)", 543 zap.String("message-type", m.Type.String()), 544 zap.String("local-member-id", cr.tr.ID.String()), 545 zap.String("from", types.ID(m.From).String()), 546 zap.String("remote-peer-id", types.ID(m.To).String()), 547 zap.Bool("remote-peer-active", cr.status.isActive()), 548 ) 549 } 550 } 551 recvFailures.WithLabelValues(types.ID(m.From).String()).Inc() 552 } 553 } 554} 555 556func (cr *streamReader) stop() { 557 cr.mu.Lock() 558 cr.cancel() 559 cr.close() 560 cr.mu.Unlock() 561 <-cr.done 562} 563 564func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { 565 u := cr.picker.pick() 566 uu := u 567 uu.Path = path.Join(t.endpoint(cr.lg), cr.tr.ID.String()) 568 569 if cr.lg != nil { 570 cr.lg.Debug( 571 "dial stream reader", 572 zap.String("from", cr.tr.ID.String()), 573 zap.String("to", cr.peerID.String()), 574 zap.String("address", uu.String()), 575 ) 576 } 577 req, err := http.NewRequest("GET", uu.String(), nil) 578 if err != nil { 579 cr.picker.unreachable(u) 580 return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err) 581 } 582 req.Header.Set("X-Server-From", cr.tr.ID.String()) 583 req.Header.Set("X-Server-Version", version.Version) 584 req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion) 585 req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String()) 586 req.Header.Set("X-Raft-To", cr.peerID.String()) 587 588 setPeerURLsHeader(req, cr.tr.URLs) 589 590 req = req.WithContext(cr.ctx) 591 592 cr.mu.Lock() 593 select { 594 case <-cr.ctx.Done(): 595 cr.mu.Unlock() 596 return nil, fmt.Errorf("stream reader is stopped") 597 default: 598 } 599 cr.mu.Unlock() 600 601 resp, err := cr.tr.streamRt.RoundTrip(req) 602 if err != nil { 603 cr.picker.unreachable(u) 604 return nil, err 605 } 606 607 rv := serverVersion(resp.Header) 608 lv := semver.Must(semver.NewVersion(version.Version)) 609 if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) { 610 httputil.GracefulClose(resp) 611 cr.picker.unreachable(u) 612 return nil, errUnsupportedStreamType 613 } 614 615 switch resp.StatusCode { 616 case http.StatusGone: 617 httputil.GracefulClose(resp) 618 cr.picker.unreachable(u) 619 reportCriticalError(errMemberRemoved, cr.errorc) 620 return nil, errMemberRemoved 621 622 case http.StatusOK: 623 return resp.Body, nil 624 625 case http.StatusNotFound: 626 httputil.GracefulClose(resp) 627 cr.picker.unreachable(u) 628 return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID) 629 630 case http.StatusPreconditionFailed: 631 b, err := ioutil.ReadAll(resp.Body) 632 if err != nil { 633 cr.picker.unreachable(u) 634 return nil, err 635 } 636 httputil.GracefulClose(resp) 637 cr.picker.unreachable(u) 638 639 switch strings.TrimSuffix(string(b), "\n") { 640 case errIncompatibleVersion.Error(): 641 if cr.lg != nil { 642 cr.lg.Warn( 643 "request sent was ignored by remote peer due to server version incompatibility", 644 zap.String("local-member-id", cr.tr.ID.String()), 645 zap.String("remote-peer-id", cr.peerID.String()), 646 zap.Error(errIncompatibleVersion), 647 ) 648 } 649 return nil, errIncompatibleVersion 650 651 case errClusterIDMismatch.Error(): 652 if cr.lg != nil { 653 cr.lg.Warn( 654 "request sent was ignored by remote peer due to cluster ID mismatch", 655 zap.String("remote-peer-id", cr.peerID.String()), 656 zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")), 657 zap.String("local-member-id", cr.tr.ID.String()), 658 zap.String("local-member-cluster-id", cr.tr.ClusterID.String()), 659 zap.Error(errClusterIDMismatch), 660 ) 661 } 662 return nil, errClusterIDMismatch 663 664 default: 665 return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b)) 666 } 667 668 default: 669 httputil.GracefulClose(resp) 670 cr.picker.unreachable(u) 671 return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode) 672 } 673} 674 675func (cr *streamReader) close() { 676 if cr.closer != nil { 677 if err := cr.closer.Close(); err != nil { 678 if cr.lg != nil { 679 cr.lg.Warn( 680 "failed to close remote peer connection", 681 zap.String("local-member-id", cr.tr.ID.String()), 682 zap.String("remote-peer-id", cr.peerID.String()), 683 zap.Error(err), 684 ) 685 } 686 } 687 } 688 cr.closer = nil 689} 690 691func (cr *streamReader) pause() { 692 cr.mu.Lock() 693 defer cr.mu.Unlock() 694 cr.paused = true 695} 696 697func (cr *streamReader) resume() { 698 cr.mu.Lock() 699 defer cr.mu.Unlock() 700 cr.paused = false 701} 702 703// checkStreamSupport checks whether the stream type is supported in the 704// given version. 705func checkStreamSupport(v *semver.Version, t streamType) bool { 706 nv := &semver.Version{Major: v.Major, Minor: v.Minor} 707 for _, s := range supportedStream[nv.String()] { 708 if s == t { 709 return true 710 } 711 } 712 return false 713} 714