1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "fmt" 19 "io" 20 "io/ioutil" 21 "net" 22 "net/http" 23 "path" 24 "strings" 25 "sync" 26 "time" 27 28 "github.com/coreos/etcd/etcdserver/stats" 29 "github.com/coreos/etcd/pkg/httputil" 30 "github.com/coreos/etcd/pkg/types" 31 "github.com/coreos/etcd/raft/raftpb" 32 "github.com/coreos/etcd/version" 33 "github.com/coreos/go-semver/semver" 34) 35 36const ( 37 streamTypeMessage streamType = "message" 38 streamTypeMsgAppV2 streamType = "msgappv2" 39 40 streamBufSize = 4096 41) 42 43var ( 44 errUnsupportedStreamType = fmt.Errorf("unsupported stream type") 45 46 // the key is in string format "major.minor.patch" 47 supportedStream = map[string][]streamType{ 48 "2.0.0": {}, 49 "2.1.0": {streamTypeMsgAppV2, streamTypeMessage}, 50 "2.2.0": {streamTypeMsgAppV2, streamTypeMessage}, 51 "2.3.0": {streamTypeMsgAppV2, streamTypeMessage}, 52 "3.0.0": {streamTypeMsgAppV2, streamTypeMessage}, 53 "3.1.0": {streamTypeMsgAppV2, streamTypeMessage}, 54 } 55) 56 57type streamType string 58 59func (t streamType) endpoint() string { 60 switch t { 61 case streamTypeMsgAppV2: 62 return path.Join(RaftStreamPrefix, "msgapp") 63 case streamTypeMessage: 64 return path.Join(RaftStreamPrefix, "message") 65 default: 66 plog.Panicf("unhandled stream type %v", t) 67 return "" 68 } 69} 70 71func (t streamType) String() string { 72 switch t { 73 case streamTypeMsgAppV2: 74 return "stream MsgApp v2" 75 case streamTypeMessage: 76 return "stream Message" 77 default: 78 return "unknown stream" 79 } 80} 81 82var ( 83 // linkHeartbeatMessage is a special message used as heartbeat message in 84 // link layer. It never conflicts with messages from raft because raft 85 // doesn't send out messages without From and To fields. 86 linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat} 87) 88 89func isLinkHeartbeatMessage(m *raftpb.Message) bool { 90 return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0 91} 92 93type outgoingConn struct { 94 t streamType 95 io.Writer 96 http.Flusher 97 io.Closer 98} 99 100// streamWriter writes messages to the attached outgoingConn. 101type streamWriter struct { 102 peerID types.ID 103 status *peerStatus 104 fs *stats.FollowerStats 105 r Raft 106 107 mu sync.Mutex // guard field working and closer 108 closer io.Closer 109 working bool 110 111 msgc chan raftpb.Message 112 connc chan *outgoingConn 113 stopc chan struct{} 114 done chan struct{} 115} 116 117// startStreamWriter creates a streamWrite and starts a long running go-routine that accepts 118// messages and writes to the attached outgoing connection. 119func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter { 120 w := &streamWriter{ 121 peerID: id, 122 status: status, 123 fs: fs, 124 r: r, 125 msgc: make(chan raftpb.Message, streamBufSize), 126 connc: make(chan *outgoingConn), 127 stopc: make(chan struct{}), 128 done: make(chan struct{}), 129 } 130 go w.run() 131 return w 132} 133 134func (cw *streamWriter) run() { 135 var ( 136 msgc chan raftpb.Message 137 heartbeatc <-chan time.Time 138 t streamType 139 enc encoder 140 flusher http.Flusher 141 batched int 142 ) 143 tickc := time.Tick(ConnReadTimeout / 3) 144 unflushed := 0 145 146 plog.Infof("started streaming with peer %s (writer)", cw.peerID) 147 148 for { 149 select { 150 case <-heartbeatc: 151 err := enc.encode(&linkHeartbeatMessage) 152 unflushed += linkHeartbeatMessage.Size() 153 if err == nil { 154 flusher.Flush() 155 batched = 0 156 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed)) 157 unflushed = 0 158 continue 159 } 160 161 cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) 162 163 sentFailures.WithLabelValues(cw.peerID.String()).Inc() 164 cw.close() 165 plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) 166 heartbeatc, msgc = nil, nil 167 168 case m := <-msgc: 169 err := enc.encode(&m) 170 if err == nil { 171 unflushed += m.Size() 172 173 if len(msgc) == 0 || batched > streamBufSize/2 { 174 flusher.Flush() 175 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed)) 176 unflushed = 0 177 batched = 0 178 } else { 179 batched++ 180 } 181 182 continue 183 } 184 185 cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) 186 cw.close() 187 plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) 188 heartbeatc, msgc = nil, nil 189 cw.r.ReportUnreachable(m.To) 190 sentFailures.WithLabelValues(cw.peerID.String()).Inc() 191 192 case conn := <-cw.connc: 193 cw.mu.Lock() 194 closed := cw.closeUnlocked() 195 t = conn.t 196 switch conn.t { 197 case streamTypeMsgAppV2: 198 enc = newMsgAppV2Encoder(conn.Writer, cw.fs) 199 case streamTypeMessage: 200 enc = &messageEncoder{w: conn.Writer} 201 default: 202 plog.Panicf("unhandled stream type %s", conn.t) 203 } 204 flusher = conn.Flusher 205 unflushed = 0 206 cw.status.activate() 207 cw.closer = conn.Closer 208 cw.working = true 209 cw.mu.Unlock() 210 211 if closed { 212 plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t) 213 } 214 plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t) 215 heartbeatc, msgc = tickc, cw.msgc 216 case <-cw.stopc: 217 if cw.close() { 218 plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) 219 } 220 plog.Infof("stopped streaming with peer %s (writer)", cw.peerID) 221 close(cw.done) 222 return 223 } 224 } 225} 226 227func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) { 228 cw.mu.Lock() 229 defer cw.mu.Unlock() 230 return cw.msgc, cw.working 231} 232 233func (cw *streamWriter) close() bool { 234 cw.mu.Lock() 235 defer cw.mu.Unlock() 236 return cw.closeUnlocked() 237} 238 239func (cw *streamWriter) closeUnlocked() bool { 240 if !cw.working { 241 return false 242 } 243 cw.closer.Close() 244 if len(cw.msgc) > 0 { 245 cw.r.ReportUnreachable(uint64(cw.peerID)) 246 } 247 cw.msgc = make(chan raftpb.Message, streamBufSize) 248 cw.working = false 249 return true 250} 251 252func (cw *streamWriter) attach(conn *outgoingConn) bool { 253 select { 254 case cw.connc <- conn: 255 return true 256 case <-cw.done: 257 return false 258 } 259} 260 261func (cw *streamWriter) stop() { 262 close(cw.stopc) 263 <-cw.done 264} 265 266// streamReader is a long-running go-routine that dials to the remote stream 267// endpoint and reads messages from the response body returned. 268type streamReader struct { 269 peerID types.ID 270 typ streamType 271 272 tr *Transport 273 picker *urlPicker 274 status *peerStatus 275 recvc chan<- raftpb.Message 276 propc chan<- raftpb.Message 277 278 errorc chan<- error 279 280 mu sync.Mutex 281 paused bool 282 cancel func() 283 closer io.Closer 284 285 stopc chan struct{} 286 done chan struct{} 287} 288 289func (r *streamReader) start() { 290 r.stopc = make(chan struct{}) 291 r.done = make(chan struct{}) 292 if r.errorc == nil { 293 r.errorc = r.tr.ErrorC 294 } 295 296 go r.run() 297} 298 299func (cr *streamReader) run() { 300 t := cr.typ 301 plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t) 302 for { 303 rc, err := cr.dial(t) 304 if err != nil { 305 if err != errUnsupportedStreamType { 306 cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error()) 307 } 308 } else { 309 cr.status.activate() 310 plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) 311 err := cr.decodeLoop(rc, t) 312 plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) 313 switch { 314 // all data is read out 315 case err == io.EOF: 316 // connection is closed by the remote 317 case isClosedConnectionError(err): 318 default: 319 cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error()) 320 } 321 } 322 select { 323 // Wait 100ms to create a new stream, so it doesn't bring too much 324 // overhead when retry. 325 case <-time.After(100 * time.Millisecond): 326 case <-cr.stopc: 327 plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t) 328 close(cr.done) 329 return 330 } 331 } 332} 333 334func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { 335 var dec decoder 336 cr.mu.Lock() 337 switch t { 338 case streamTypeMsgAppV2: 339 dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID) 340 case streamTypeMessage: 341 dec = &messageDecoder{r: rc} 342 default: 343 plog.Panicf("unhandled stream type %s", t) 344 } 345 select { 346 case <-cr.stopc: 347 cr.mu.Unlock() 348 if err := rc.Close(); err != nil { 349 return err 350 } 351 return io.EOF 352 default: 353 cr.closer = rc 354 } 355 cr.mu.Unlock() 356 357 for { 358 m, err := dec.decode() 359 if err != nil { 360 cr.mu.Lock() 361 cr.close() 362 cr.mu.Unlock() 363 return err 364 } 365 366 receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size())) 367 368 cr.mu.Lock() 369 paused := cr.paused 370 cr.mu.Unlock() 371 372 if paused { 373 continue 374 } 375 376 if isLinkHeartbeatMessage(&m) { 377 // raft is not interested in link layer 378 // heartbeat message, so we should ignore 379 // it. 380 continue 381 } 382 383 recvc := cr.recvc 384 if m.Type == raftpb.MsgProp { 385 recvc = cr.propc 386 } 387 388 select { 389 case recvc <- m: 390 default: 391 if cr.status.isActive() { 392 plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From)) 393 } 394 plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) 395 recvFailures.WithLabelValues(types.ID(m.From).String()).Inc() 396 } 397 } 398} 399 400func (cr *streamReader) stop() { 401 close(cr.stopc) 402 cr.mu.Lock() 403 if cr.cancel != nil { 404 cr.cancel() 405 } 406 cr.close() 407 cr.mu.Unlock() 408 <-cr.done 409} 410 411func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { 412 u := cr.picker.pick() 413 uu := u 414 uu.Path = path.Join(t.endpoint(), cr.tr.ID.String()) 415 416 req, err := http.NewRequest("GET", uu.String(), nil) 417 if err != nil { 418 cr.picker.unreachable(u) 419 return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err) 420 } 421 req.Header.Set("X-Server-From", cr.tr.ID.String()) 422 req.Header.Set("X-Server-Version", version.Version) 423 req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion) 424 req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String()) 425 req.Header.Set("X-Raft-To", cr.peerID.String()) 426 427 setPeerURLsHeader(req, cr.tr.URLs) 428 429 cr.mu.Lock() 430 select { 431 case <-cr.stopc: 432 cr.mu.Unlock() 433 return nil, fmt.Errorf("stream reader is stopped") 434 default: 435 } 436 cr.cancel = httputil.RequestCanceler(req) 437 cr.mu.Unlock() 438 439 resp, err := cr.tr.streamRt.RoundTrip(req) 440 if err != nil { 441 cr.picker.unreachable(u) 442 return nil, err 443 } 444 445 rv := serverVersion(resp.Header) 446 lv := semver.Must(semver.NewVersion(version.Version)) 447 if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) { 448 httputil.GracefulClose(resp) 449 cr.picker.unreachable(u) 450 return nil, errUnsupportedStreamType 451 } 452 453 switch resp.StatusCode { 454 case http.StatusGone: 455 httputil.GracefulClose(resp) 456 cr.picker.unreachable(u) 457 reportCriticalError(errMemberRemoved, cr.errorc) 458 return nil, errMemberRemoved 459 case http.StatusOK: 460 return resp.Body, nil 461 case http.StatusNotFound: 462 httputil.GracefulClose(resp) 463 cr.picker.unreachable(u) 464 return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID) 465 case http.StatusPreconditionFailed: 466 b, err := ioutil.ReadAll(resp.Body) 467 if err != nil { 468 cr.picker.unreachable(u) 469 return nil, err 470 } 471 httputil.GracefulClose(resp) 472 cr.picker.unreachable(u) 473 474 switch strings.TrimSuffix(string(b), "\n") { 475 case errIncompatibleVersion.Error(): 476 plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID) 477 return nil, errIncompatibleVersion 478 case errClusterIDMismatch.Error(): 479 plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)", 480 cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID) 481 return nil, errClusterIDMismatch 482 default: 483 return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b)) 484 } 485 default: 486 httputil.GracefulClose(resp) 487 cr.picker.unreachable(u) 488 return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode) 489 } 490} 491 492func (cr *streamReader) close() { 493 if cr.closer != nil { 494 cr.closer.Close() 495 } 496 cr.closer = nil 497} 498 499func (cr *streamReader) pause() { 500 cr.mu.Lock() 501 defer cr.mu.Unlock() 502 cr.paused = true 503} 504 505func (cr *streamReader) resume() { 506 cr.mu.Lock() 507 defer cr.mu.Unlock() 508 cr.paused = false 509} 510 511func isClosedConnectionError(err error) bool { 512 operr, ok := err.(*net.OpError) 513 return ok && operr.Err.Error() == "use of closed network connection" 514} 515 516// checkStreamSupport checks whether the stream type is supported in the 517// given version. 518func checkStreamSupport(v *semver.Version, t streamType) bool { 519 nv := &semver.Version{Major: v.Major, Minor: v.Minor} 520 for _, s := range supportedStream[nv.String()] { 521 if s == t { 522 return true 523 } 524 } 525 return false 526} 527