1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "context" 19 "net/http" 20 "sync" 21 "time" 22 23 "go.etcd.io/etcd/client/pkg/v3/transport" 24 "go.etcd.io/etcd/client/pkg/v3/types" 25 "go.etcd.io/etcd/raft/v3" 26 "go.etcd.io/etcd/raft/v3/raftpb" 27 "go.etcd.io/etcd/server/v3/etcdserver/api/snap" 28 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats" 29 30 "github.com/xiang90/probing" 31 "go.uber.org/zap" 32 "golang.org/x/time/rate" 33) 34 35type Raft interface { 36 Process(ctx context.Context, m raftpb.Message) error 37 IsIDRemoved(id uint64) bool 38 ReportUnreachable(id uint64) 39 ReportSnapshot(id uint64, status raft.SnapshotStatus) 40} 41 42type Transporter interface { 43 // Start starts the given Transporter. 44 // Start MUST be called before calling other functions in the interface. 45 Start() error 46 // Handler returns the HTTP handler of the transporter. 47 // A transporter HTTP handler handles the HTTP requests 48 // from remote peers. 49 // The handler MUST be used to handle RaftPrefix(/raft) 50 // endpoint. 51 Handler() http.Handler 52 // Send sends out the given messages to the remote peers. 53 // Each message has a To field, which is an id that maps 54 // to an existing peer in the transport. 55 // If the id cannot be found in the transport, the message 56 // will be ignored. 57 Send(m []raftpb.Message) 58 // SendSnapshot sends out the given snapshot message to a remote peer. 59 // The behavior of SendSnapshot is similar to Send. 60 SendSnapshot(m snap.Message) 61 // AddRemote adds a remote with given peer urls into the transport. 62 // A remote helps newly joined member to catch up the progress of cluster, 63 // and will not be used after that. 64 // It is the caller's responsibility to ensure the urls are all valid, 65 // or it panics. 66 AddRemote(id types.ID, urls []string) 67 // AddPeer adds a peer with given peer urls into the transport. 68 // It is the caller's responsibility to ensure the urls are all valid, 69 // or it panics. 70 // Peer urls are used to connect to the remote peer. 71 AddPeer(id types.ID, urls []string) 72 // RemovePeer removes the peer with given id. 73 RemovePeer(id types.ID) 74 // RemoveAllPeers removes all the existing peers in the transport. 75 RemoveAllPeers() 76 // UpdatePeer updates the peer urls of the peer with the given id. 77 // It is the caller's responsibility to ensure the urls are all valid, 78 // or it panics. 79 UpdatePeer(id types.ID, urls []string) 80 // ActiveSince returns the time that the connection with the peer 81 // of the given id becomes active. 82 // If the connection is active since peer was added, it returns the adding time. 83 // If the connection is currently inactive, it returns zero time. 84 ActiveSince(id types.ID) time.Time 85 // ActivePeers returns the number of active peers. 86 ActivePeers() int 87 // Stop closes the connections and stops the transporter. 88 Stop() 89} 90 91// Transport implements Transporter interface. It provides the functionality 92// to send raft messages to peers, and receive raft messages from peers. 93// User should call Handler method to get a handler to serve requests 94// received from peerURLs. 95// User needs to call Start before calling other functions, and call 96// Stop when the Transport is no longer used. 97type Transport struct { 98 Logger *zap.Logger 99 100 DialTimeout time.Duration // maximum duration before timing out dial of the request 101 // DialRetryFrequency defines the frequency of streamReader dial retrial attempts; 102 // a distinct rate limiter is created per every peer (default value: 10 events/sec) 103 DialRetryFrequency rate.Limit 104 105 TLSInfo transport.TLSInfo // TLS information used when creating connection 106 107 ID types.ID // local member ID 108 URLs types.URLs // local peer URLs 109 ClusterID types.ID // raft cluster ID for request validation 110 Raft Raft // raft state machine, to which the Transport forwards received messages and reports status 111 Snapshotter *snap.Snapshotter 112 ServerStats *stats.ServerStats // used to record general transportation statistics 113 // used to record transportation statistics with followers when 114 // performing as leader in raft protocol 115 LeaderStats *stats.LeaderStats 116 // ErrorC is used to report detected critical errors, e.g., 117 // the member has been permanently removed from the cluster 118 // When an error is received from ErrorC, user should stop raft state 119 // machine and thus stop the Transport. 120 ErrorC chan error 121 122 streamRt http.RoundTripper // roundTripper used by streams 123 pipelineRt http.RoundTripper // roundTripper used by pipelines 124 125 mu sync.RWMutex // protect the remote and peer map 126 remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up 127 peers map[types.ID]Peer // peers map 128 129 pipelineProber probing.Prober 130 streamProber probing.Prober 131} 132 133func (t *Transport) Start() error { 134 var err error 135 t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout) 136 if err != nil { 137 return err 138 } 139 t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout) 140 if err != nil { 141 return err 142 } 143 t.remotes = make(map[types.ID]*remote) 144 t.peers = make(map[types.ID]Peer) 145 t.pipelineProber = probing.NewProber(t.pipelineRt) 146 t.streamProber = probing.NewProber(t.streamRt) 147 148 // If client didn't provide dial retry frequency, use the default 149 // (100ms backoff between attempts to create a new stream), 150 // so it doesn't bring too much overhead when retry. 151 if t.DialRetryFrequency == 0 { 152 t.DialRetryFrequency = rate.Every(100 * time.Millisecond) 153 } 154 return nil 155} 156 157func (t *Transport) Handler() http.Handler { 158 pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID) 159 streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID) 160 snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID) 161 mux := http.NewServeMux() 162 mux.Handle(RaftPrefix, pipelineHandler) 163 mux.Handle(RaftStreamPrefix+"/", streamHandler) 164 mux.Handle(RaftSnapshotPrefix, snapHandler) 165 mux.Handle(ProbingPrefix, probing.NewHandler()) 166 return mux 167} 168 169func (t *Transport) Get(id types.ID) Peer { 170 t.mu.RLock() 171 defer t.mu.RUnlock() 172 return t.peers[id] 173} 174 175func (t *Transport) Send(msgs []raftpb.Message) { 176 for _, m := range msgs { 177 if m.To == 0 { 178 // ignore intentionally dropped message 179 continue 180 } 181 to := types.ID(m.To) 182 183 t.mu.RLock() 184 p, pok := t.peers[to] 185 g, rok := t.remotes[to] 186 t.mu.RUnlock() 187 188 if pok { 189 if m.Type == raftpb.MsgApp { 190 t.ServerStats.SendAppendReq(m.Size()) 191 } 192 p.send(m) 193 continue 194 } 195 196 if rok { 197 g.send(m) 198 continue 199 } 200 201 if t.Logger != nil { 202 t.Logger.Debug( 203 "ignored message send request; unknown remote peer target", 204 zap.String("type", m.Type.String()), 205 zap.String("unknown-target-peer-id", to.String()), 206 ) 207 } 208 } 209} 210 211func (t *Transport) Stop() { 212 t.mu.Lock() 213 defer t.mu.Unlock() 214 for _, r := range t.remotes { 215 r.stop() 216 } 217 for _, p := range t.peers { 218 p.stop() 219 } 220 t.pipelineProber.RemoveAll() 221 t.streamProber.RemoveAll() 222 if tr, ok := t.streamRt.(*http.Transport); ok { 223 tr.CloseIdleConnections() 224 } 225 if tr, ok := t.pipelineRt.(*http.Transport); ok { 226 tr.CloseIdleConnections() 227 } 228 t.peers = nil 229 t.remotes = nil 230} 231 232// CutPeer drops messages to the specified peer. 233func (t *Transport) CutPeer(id types.ID) { 234 t.mu.RLock() 235 p, pok := t.peers[id] 236 g, gok := t.remotes[id] 237 t.mu.RUnlock() 238 239 if pok { 240 p.(Pausable).Pause() 241 } 242 if gok { 243 g.Pause() 244 } 245} 246 247// MendPeer recovers the message dropping behavior of the given peer. 248func (t *Transport) MendPeer(id types.ID) { 249 t.mu.RLock() 250 p, pok := t.peers[id] 251 g, gok := t.remotes[id] 252 t.mu.RUnlock() 253 254 if pok { 255 p.(Pausable).Resume() 256 } 257 if gok { 258 g.Resume() 259 } 260} 261 262func (t *Transport) AddRemote(id types.ID, us []string) { 263 t.mu.Lock() 264 defer t.mu.Unlock() 265 if t.remotes == nil { 266 // there's no clean way to shutdown the golang http server 267 // (see: https://github.com/golang/go/issues/4674) before 268 // stopping the transport; ignore any new connections. 269 return 270 } 271 if _, ok := t.peers[id]; ok { 272 return 273 } 274 if _, ok := t.remotes[id]; ok { 275 return 276 } 277 urls, err := types.NewURLs(us) 278 if err != nil { 279 if t.Logger != nil { 280 t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err)) 281 } 282 } 283 t.remotes[id] = startRemote(t, urls, id) 284 285 if t.Logger != nil { 286 t.Logger.Info( 287 "added new remote peer", 288 zap.String("local-member-id", t.ID.String()), 289 zap.String("remote-peer-id", id.String()), 290 zap.Strings("remote-peer-urls", us), 291 ) 292 } 293} 294 295func (t *Transport) AddPeer(id types.ID, us []string) { 296 t.mu.Lock() 297 defer t.mu.Unlock() 298 299 if t.peers == nil { 300 panic("transport stopped") 301 } 302 if _, ok := t.peers[id]; ok { 303 return 304 } 305 urls, err := types.NewURLs(us) 306 if err != nil { 307 if t.Logger != nil { 308 t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err)) 309 } 310 } 311 fs := t.LeaderStats.Follower(id.String()) 312 t.peers[id] = startPeer(t, urls, id, fs) 313 addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) 314 addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec) 315 316 if t.Logger != nil { 317 t.Logger.Info( 318 "added remote peer", 319 zap.String("local-member-id", t.ID.String()), 320 zap.String("remote-peer-id", id.String()), 321 zap.Strings("remote-peer-urls", us), 322 ) 323 } 324} 325 326func (t *Transport) RemovePeer(id types.ID) { 327 t.mu.Lock() 328 defer t.mu.Unlock() 329 t.removePeer(id) 330} 331 332func (t *Transport) RemoveAllPeers() { 333 t.mu.Lock() 334 defer t.mu.Unlock() 335 for id := range t.peers { 336 t.removePeer(id) 337 } 338} 339 340// the caller of this function must have the peers mutex. 341func (t *Transport) removePeer(id types.ID) { 342 if peer, ok := t.peers[id]; ok { 343 peer.stop() 344 } else { 345 if t.Logger != nil { 346 t.Logger.Panic("unexpected removal of unknown remote peer", zap.String("remote-peer-id", id.String())) 347 } 348 } 349 delete(t.peers, id) 350 delete(t.LeaderStats.Followers, id.String()) 351 t.pipelineProber.Remove(id.String()) 352 t.streamProber.Remove(id.String()) 353 354 if t.Logger != nil { 355 t.Logger.Info( 356 "removed remote peer", 357 zap.String("local-member-id", t.ID.String()), 358 zap.String("removed-remote-peer-id", id.String()), 359 ) 360 } 361} 362 363func (t *Transport) UpdatePeer(id types.ID, us []string) { 364 t.mu.Lock() 365 defer t.mu.Unlock() 366 // TODO: return error or just panic? 367 if _, ok := t.peers[id]; !ok { 368 return 369 } 370 urls, err := types.NewURLs(us) 371 if err != nil { 372 if t.Logger != nil { 373 t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err)) 374 } 375 } 376 t.peers[id].update(urls) 377 378 t.pipelineProber.Remove(id.String()) 379 addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) 380 t.streamProber.Remove(id.String()) 381 addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec) 382 383 if t.Logger != nil { 384 t.Logger.Info( 385 "updated remote peer", 386 zap.String("local-member-id", t.ID.String()), 387 zap.String("updated-remote-peer-id", id.String()), 388 zap.Strings("updated-remote-peer-urls", us), 389 ) 390 } 391} 392 393func (t *Transport) ActiveSince(id types.ID) time.Time { 394 t.mu.RLock() 395 defer t.mu.RUnlock() 396 if p, ok := t.peers[id]; ok { 397 return p.activeSince() 398 } 399 return time.Time{} 400} 401 402func (t *Transport) SendSnapshot(m snap.Message) { 403 t.mu.Lock() 404 defer t.mu.Unlock() 405 p := t.peers[types.ID(m.To)] 406 if p == nil { 407 m.CloseWithError(errMemberNotFound) 408 return 409 } 410 p.sendSnap(m) 411} 412 413// Pausable is a testing interface for pausing transport traffic. 414type Pausable interface { 415 Pause() 416 Resume() 417} 418 419func (t *Transport) Pause() { 420 t.mu.RLock() 421 defer t.mu.RUnlock() 422 for _, p := range t.peers { 423 p.(Pausable).Pause() 424 } 425} 426 427func (t *Transport) Resume() { 428 t.mu.RLock() 429 defer t.mu.RUnlock() 430 for _, p := range t.peers { 431 p.(Pausable).Resume() 432 } 433} 434 435// ActivePeers returns a channel that closes when an initial 436// peer connection has been established. Use this to wait until the 437// first peer connection becomes active. 438func (t *Transport) ActivePeers() (cnt int) { 439 t.mu.RLock() 440 defer t.mu.RUnlock() 441 for _, p := range t.peers { 442 if !p.activeSince().IsZero() { 443 cnt++ 444 } 445 } 446 return cnt 447} 448