1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "context" 19 "net/http" 20 "sync" 21 "time" 22 23 "go.etcd.io/etcd/etcdserver/api/snap" 24 stats "go.etcd.io/etcd/etcdserver/api/v2stats" 25 "go.etcd.io/etcd/pkg/logutil" 26 "go.etcd.io/etcd/pkg/transport" 27 "go.etcd.io/etcd/pkg/types" 28 "go.etcd.io/etcd/raft" 29 "go.etcd.io/etcd/raft/raftpb" 30 31 "github.com/coreos/pkg/capnslog" 32 "github.com/xiang90/probing" 33 "go.uber.org/zap" 34 "golang.org/x/time/rate" 35) 36 37var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("go.etcd.io/etcd", "rafthttp")) 38 39type Raft interface { 40 Process(ctx context.Context, m raftpb.Message) error 41 IsIDRemoved(id uint64) bool 42 ReportUnreachable(id uint64) 43 ReportSnapshot(id uint64, status raft.SnapshotStatus) 44} 45 46type Transporter interface { 47 // Start starts the given Transporter. 48 // Start MUST be called before calling other functions in the interface. 49 Start() error 50 // Handler returns the HTTP handler of the transporter. 51 // A transporter HTTP handler handles the HTTP requests 52 // from remote peers. 53 // The handler MUST be used to handle RaftPrefix(/raft) 54 // endpoint. 55 Handler() http.Handler 56 // Send sends out the given messages to the remote peers. 57 // Each message has a To field, which is an id that maps 58 // to an existing peer in the transport. 59 // If the id cannot be found in the transport, the message 60 // will be ignored. 61 Send(m []raftpb.Message) 62 // SendSnapshot sends out the given snapshot message to a remote peer. 63 // The behavior of SendSnapshot is similar to Send. 64 SendSnapshot(m snap.Message) 65 // AddRemote adds a remote with given peer urls into the transport. 66 // A remote helps newly joined member to catch up the progress of cluster, 67 // and will not be used after that. 68 // It is the caller's responsibility to ensure the urls are all valid, 69 // or it panics. 70 AddRemote(id types.ID, urls []string) 71 // AddPeer adds a peer with given peer urls into the transport. 72 // It is the caller's responsibility to ensure the urls are all valid, 73 // or it panics. 74 // Peer urls are used to connect to the remote peer. 75 AddPeer(id types.ID, urls []string) 76 // RemovePeer removes the peer with given id. 77 RemovePeer(id types.ID) 78 // RemoveAllPeers removes all the existing peers in the transport. 79 RemoveAllPeers() 80 // UpdatePeer updates the peer urls of the peer with the given id. 81 // It is the caller's responsibility to ensure the urls are all valid, 82 // or it panics. 83 UpdatePeer(id types.ID, urls []string) 84 // ActiveSince returns the time that the connection with the peer 85 // of the given id becomes active. 86 // If the connection is active since peer was added, it returns the adding time. 87 // If the connection is currently inactive, it returns zero time. 88 ActiveSince(id types.ID) time.Time 89 // ActivePeers returns the number of active peers. 90 ActivePeers() int 91 // Stop closes the connections and stops the transporter. 92 Stop() 93} 94 95// Transport implements Transporter interface. It provides the functionality 96// to send raft messages to peers, and receive raft messages from peers. 97// User should call Handler method to get a handler to serve requests 98// received from peerURLs. 99// User needs to call Start before calling other functions, and call 100// Stop when the Transport is no longer used. 101type Transport struct { 102 Logger *zap.Logger 103 104 DialTimeout time.Duration // maximum duration before timing out dial of the request 105 // DialRetryFrequency defines the frequency of streamReader dial retrial attempts; 106 // a distinct rate limiter is created per every peer (default value: 10 events/sec) 107 DialRetryFrequency rate.Limit 108 109 TLSInfo transport.TLSInfo // TLS information used when creating connection 110 111 ID types.ID // local member ID 112 URLs types.URLs // local peer URLs 113 ClusterID types.ID // raft cluster ID for request validation 114 Raft Raft // raft state machine, to which the Transport forwards received messages and reports status 115 Snapshotter *snap.Snapshotter 116 ServerStats *stats.ServerStats // used to record general transportation statistics 117 // used to record transportation statistics with followers when 118 // performing as leader in raft protocol 119 LeaderStats *stats.LeaderStats 120 // ErrorC is used to report detected critical errors, e.g., 121 // the member has been permanently removed from the cluster 122 // When an error is received from ErrorC, user should stop raft state 123 // machine and thus stop the Transport. 124 ErrorC chan error 125 126 streamRt http.RoundTripper // roundTripper used by streams 127 pipelineRt http.RoundTripper // roundTripper used by pipelines 128 129 mu sync.RWMutex // protect the remote and peer map 130 remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up 131 peers map[types.ID]Peer // peers map 132 133 pipelineProber probing.Prober 134 streamProber probing.Prober 135} 136 137func (t *Transport) Start() error { 138 var err error 139 t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout) 140 if err != nil { 141 return err 142 } 143 t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout) 144 if err != nil { 145 return err 146 } 147 t.remotes = make(map[types.ID]*remote) 148 t.peers = make(map[types.ID]Peer) 149 t.pipelineProber = probing.NewProber(t.pipelineRt) 150 t.streamProber = probing.NewProber(t.streamRt) 151 152 // If client didn't provide dial retry frequency, use the default 153 // (100ms backoff between attempts to create a new stream), 154 // so it doesn't bring too much overhead when retry. 155 if t.DialRetryFrequency == 0 { 156 t.DialRetryFrequency = rate.Every(100 * time.Millisecond) 157 } 158 return nil 159} 160 161func (t *Transport) Handler() http.Handler { 162 pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID) 163 streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID) 164 snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID) 165 mux := http.NewServeMux() 166 mux.Handle(RaftPrefix, pipelineHandler) 167 mux.Handle(RaftStreamPrefix+"/", streamHandler) 168 mux.Handle(RaftSnapshotPrefix, snapHandler) 169 mux.Handle(ProbingPrefix, probing.NewHandler()) 170 return mux 171} 172 173func (t *Transport) Get(id types.ID) Peer { 174 t.mu.RLock() 175 defer t.mu.RUnlock() 176 return t.peers[id] 177} 178 179func (t *Transport) Send(msgs []raftpb.Message) { 180 for _, m := range msgs { 181 if m.To == 0 { 182 // ignore intentionally dropped message 183 continue 184 } 185 to := types.ID(m.To) 186 187 t.mu.RLock() 188 p, pok := t.peers[to] 189 g, rok := t.remotes[to] 190 t.mu.RUnlock() 191 192 if pok { 193 if m.Type == raftpb.MsgApp { 194 t.ServerStats.SendAppendReq(m.Size()) 195 } 196 p.send(m) 197 continue 198 } 199 200 if rok { 201 g.send(m) 202 continue 203 } 204 205 if t.Logger != nil { 206 t.Logger.Debug( 207 "ignored message send request; unknown remote peer target", 208 zap.String("type", m.Type.String()), 209 zap.String("unknown-target-peer-id", to.String()), 210 ) 211 } else { 212 plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to) 213 } 214 } 215} 216 217func (t *Transport) Stop() { 218 t.mu.Lock() 219 defer t.mu.Unlock() 220 for _, r := range t.remotes { 221 r.stop() 222 } 223 for _, p := range t.peers { 224 p.stop() 225 } 226 t.pipelineProber.RemoveAll() 227 t.streamProber.RemoveAll() 228 if tr, ok := t.streamRt.(*http.Transport); ok { 229 tr.CloseIdleConnections() 230 } 231 if tr, ok := t.pipelineRt.(*http.Transport); ok { 232 tr.CloseIdleConnections() 233 } 234 t.peers = nil 235 t.remotes = nil 236} 237 238// CutPeer drops messages to the specified peer. 239func (t *Transport) CutPeer(id types.ID) { 240 t.mu.RLock() 241 p, pok := t.peers[id] 242 g, gok := t.remotes[id] 243 t.mu.RUnlock() 244 245 if pok { 246 p.(Pausable).Pause() 247 } 248 if gok { 249 g.Pause() 250 } 251} 252 253// MendPeer recovers the message dropping behavior of the given peer. 254func (t *Transport) MendPeer(id types.ID) { 255 t.mu.RLock() 256 p, pok := t.peers[id] 257 g, gok := t.remotes[id] 258 t.mu.RUnlock() 259 260 if pok { 261 p.(Pausable).Resume() 262 } 263 if gok { 264 g.Resume() 265 } 266} 267 268func (t *Transport) AddRemote(id types.ID, us []string) { 269 t.mu.Lock() 270 defer t.mu.Unlock() 271 if t.remotes == nil { 272 // there's no clean way to shutdown the golang http server 273 // (see: https://github.com/golang/go/issues/4674) before 274 // stopping the transport; ignore any new connections. 275 return 276 } 277 if _, ok := t.peers[id]; ok { 278 return 279 } 280 if _, ok := t.remotes[id]; ok { 281 return 282 } 283 urls, err := types.NewURLs(us) 284 if err != nil { 285 if t.Logger != nil { 286 t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err)) 287 } else { 288 plog.Panicf("newURLs %+v should never fail: %+v", us, err) 289 } 290 } 291 t.remotes[id] = startRemote(t, urls, id) 292 293 if t.Logger != nil { 294 t.Logger.Info( 295 "added new remote peer", 296 zap.String("local-member-id", t.ID.String()), 297 zap.String("remote-peer-id", id.String()), 298 zap.Strings("remote-peer-urls", us), 299 ) 300 } 301} 302 303func (t *Transport) AddPeer(id types.ID, us []string) { 304 t.mu.Lock() 305 defer t.mu.Unlock() 306 307 if t.peers == nil { 308 panic("transport stopped") 309 } 310 if _, ok := t.peers[id]; ok { 311 return 312 } 313 urls, err := types.NewURLs(us) 314 if err != nil { 315 if t.Logger != nil { 316 t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err)) 317 } else { 318 plog.Panicf("newURLs %+v should never fail: %+v", us, err) 319 } 320 } 321 fs := t.LeaderStats.Follower(id.String()) 322 t.peers[id] = startPeer(t, urls, id, fs) 323 addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) 324 addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec) 325 326 if t.Logger != nil { 327 t.Logger.Info( 328 "added remote peer", 329 zap.String("local-member-id", t.ID.String()), 330 zap.String("remote-peer-id", id.String()), 331 zap.Strings("remote-peer-urls", us), 332 ) 333 } else { 334 plog.Infof("added peer %s", id) 335 } 336} 337 338func (t *Transport) RemovePeer(id types.ID) { 339 t.mu.Lock() 340 defer t.mu.Unlock() 341 t.removePeer(id) 342} 343 344func (t *Transport) RemoveAllPeers() { 345 t.mu.Lock() 346 defer t.mu.Unlock() 347 for id := range t.peers { 348 t.removePeer(id) 349 } 350} 351 352// the caller of this function must have the peers mutex. 353func (t *Transport) removePeer(id types.ID) { 354 if peer, ok := t.peers[id]; ok { 355 peer.stop() 356 } else { 357 if t.Logger != nil { 358 t.Logger.Panic("unexpected removal of unknown remote peer", zap.String("remote-peer-id", id.String())) 359 } else { 360 plog.Panicf("unexpected removal of unknown peer '%d'", id) 361 } 362 } 363 delete(t.peers, id) 364 delete(t.LeaderStats.Followers, id.String()) 365 t.pipelineProber.Remove(id.String()) 366 t.streamProber.Remove(id.String()) 367 368 if t.Logger != nil { 369 t.Logger.Info( 370 "removed remote peer", 371 zap.String("local-member-id", t.ID.String()), 372 zap.String("removed-remote-peer-id", id.String()), 373 ) 374 } else { 375 plog.Infof("removed peer %s", id) 376 } 377} 378 379func (t *Transport) UpdatePeer(id types.ID, us []string) { 380 t.mu.Lock() 381 defer t.mu.Unlock() 382 // TODO: return error or just panic? 383 if _, ok := t.peers[id]; !ok { 384 return 385 } 386 urls, err := types.NewURLs(us) 387 if err != nil { 388 if t.Logger != nil { 389 t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err)) 390 } else { 391 plog.Panicf("newURLs %+v should never fail: %+v", us, err) 392 } 393 } 394 t.peers[id].update(urls) 395 396 t.pipelineProber.Remove(id.String()) 397 addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) 398 t.streamProber.Remove(id.String()) 399 addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec) 400 401 if t.Logger != nil { 402 t.Logger.Info( 403 "updated remote peer", 404 zap.String("local-member-id", t.ID.String()), 405 zap.String("updated-remote-peer-id", id.String()), 406 zap.Strings("updated-remote-peer-urls", us), 407 ) 408 } else { 409 plog.Infof("updated peer %s", id) 410 } 411} 412 413func (t *Transport) ActiveSince(id types.ID) time.Time { 414 t.mu.RLock() 415 defer t.mu.RUnlock() 416 if p, ok := t.peers[id]; ok { 417 return p.activeSince() 418 } 419 return time.Time{} 420} 421 422func (t *Transport) SendSnapshot(m snap.Message) { 423 t.mu.Lock() 424 defer t.mu.Unlock() 425 p := t.peers[types.ID(m.To)] 426 if p == nil { 427 m.CloseWithError(errMemberNotFound) 428 return 429 } 430 p.sendSnap(m) 431} 432 433// Pausable is a testing interface for pausing transport traffic. 434type Pausable interface { 435 Pause() 436 Resume() 437} 438 439func (t *Transport) Pause() { 440 t.mu.RLock() 441 defer t.mu.RUnlock() 442 for _, p := range t.peers { 443 p.(Pausable).Pause() 444 } 445} 446 447func (t *Transport) Resume() { 448 t.mu.RLock() 449 defer t.mu.RUnlock() 450 for _, p := range t.peers { 451 p.(Pausable).Resume() 452 } 453} 454 455// ActivePeers returns a channel that closes when an initial 456// peer connection has been established. Use this to wait until the 457// first peer connection becomes active. 458func (t *Transport) ActivePeers() (cnt int) { 459 t.mu.RLock() 460 defer t.mu.RUnlock() 461 for _, p := range t.peers { 462 if !p.activeSince().IsZero() { 463 cnt++ 464 } 465 } 466 return cnt 467} 468