1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"context"
19	"net/http"
20	"sync"
21	"time"
22
23	"go.etcd.io/etcd/client/pkg/v3/transport"
24	"go.etcd.io/etcd/client/pkg/v3/types"
25	"go.etcd.io/etcd/raft/v3"
26	"go.etcd.io/etcd/raft/v3/raftpb"
27	"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
28	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
29
30	"github.com/xiang90/probing"
31	"go.uber.org/zap"
32	"golang.org/x/time/rate"
33)
34
35type Raft interface {
36	Process(ctx context.Context, m raftpb.Message) error
37	IsIDRemoved(id uint64) bool
38	ReportUnreachable(id uint64)
39	ReportSnapshot(id uint64, status raft.SnapshotStatus)
40}
41
42type Transporter interface {
43	// Start starts the given Transporter.
44	// Start MUST be called before calling other functions in the interface.
45	Start() error
46	// Handler returns the HTTP handler of the transporter.
47	// A transporter HTTP handler handles the HTTP requests
48	// from remote peers.
49	// The handler MUST be used to handle RaftPrefix(/raft)
50	// endpoint.
51	Handler() http.Handler
52	// Send sends out the given messages to the remote peers.
53	// Each message has a To field, which is an id that maps
54	// to an existing peer in the transport.
55	// If the id cannot be found in the transport, the message
56	// will be ignored.
57	Send(m []raftpb.Message)
58	// SendSnapshot sends out the given snapshot message to a remote peer.
59	// The behavior of SendSnapshot is similar to Send.
60	SendSnapshot(m snap.Message)
61	// AddRemote adds a remote with given peer urls into the transport.
62	// A remote helps newly joined member to catch up the progress of cluster,
63	// and will not be used after that.
64	// It is the caller's responsibility to ensure the urls are all valid,
65	// or it panics.
66	AddRemote(id types.ID, urls []string)
67	// AddPeer adds a peer with given peer urls into the transport.
68	// It is the caller's responsibility to ensure the urls are all valid,
69	// or it panics.
70	// Peer urls are used to connect to the remote peer.
71	AddPeer(id types.ID, urls []string)
72	// RemovePeer removes the peer with given id.
73	RemovePeer(id types.ID)
74	// RemoveAllPeers removes all the existing peers in the transport.
75	RemoveAllPeers()
76	// UpdatePeer updates the peer urls of the peer with the given id.
77	// It is the caller's responsibility to ensure the urls are all valid,
78	// or it panics.
79	UpdatePeer(id types.ID, urls []string)
80	// ActiveSince returns the time that the connection with the peer
81	// of the given id becomes active.
82	// If the connection is active since peer was added, it returns the adding time.
83	// If the connection is currently inactive, it returns zero time.
84	ActiveSince(id types.ID) time.Time
85	// ActivePeers returns the number of active peers.
86	ActivePeers() int
87	// Stop closes the connections and stops the transporter.
88	Stop()
89}
90
91// Transport implements Transporter interface. It provides the functionality
92// to send raft messages to peers, and receive raft messages from peers.
93// User should call Handler method to get a handler to serve requests
94// received from peerURLs.
95// User needs to call Start before calling other functions, and call
96// Stop when the Transport is no longer used.
97type Transport struct {
98	Logger *zap.Logger
99
100	DialTimeout time.Duration // maximum duration before timing out dial of the request
101	// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
102	// a distinct rate limiter is created per every peer (default value: 10 events/sec)
103	DialRetryFrequency rate.Limit
104
105	TLSInfo transport.TLSInfo // TLS information used when creating connection
106
107	ID          types.ID   // local member ID
108	URLs        types.URLs // local peer URLs
109	ClusterID   types.ID   // raft cluster ID for request validation
110	Raft        Raft       // raft state machine, to which the Transport forwards received messages and reports status
111	Snapshotter *snap.Snapshotter
112	ServerStats *stats.ServerStats // used to record general transportation statistics
113	// used to record transportation statistics with followers when
114	// performing as leader in raft protocol
115	LeaderStats *stats.LeaderStats
116	// ErrorC is used to report detected critical errors, e.g.,
117	// the member has been permanently removed from the cluster
118	// When an error is received from ErrorC, user should stop raft state
119	// machine and thus stop the Transport.
120	ErrorC chan error
121
122	streamRt   http.RoundTripper // roundTripper used by streams
123	pipelineRt http.RoundTripper // roundTripper used by pipelines
124
125	mu      sync.RWMutex         // protect the remote and peer map
126	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
127	peers   map[types.ID]Peer    // peers map
128
129	pipelineProber probing.Prober
130	streamProber   probing.Prober
131}
132
133func (t *Transport) Start() error {
134	var err error
135	t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
136	if err != nil {
137		return err
138	}
139	t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
140	if err != nil {
141		return err
142	}
143	t.remotes = make(map[types.ID]*remote)
144	t.peers = make(map[types.ID]Peer)
145	t.pipelineProber = probing.NewProber(t.pipelineRt)
146	t.streamProber = probing.NewProber(t.streamRt)
147
148	// If client didn't provide dial retry frequency, use the default
149	// (100ms backoff between attempts to create a new stream),
150	// so it doesn't bring too much overhead when retry.
151	if t.DialRetryFrequency == 0 {
152		t.DialRetryFrequency = rate.Every(100 * time.Millisecond)
153	}
154	return nil
155}
156
157func (t *Transport) Handler() http.Handler {
158	pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
159	streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
160	snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
161	mux := http.NewServeMux()
162	mux.Handle(RaftPrefix, pipelineHandler)
163	mux.Handle(RaftStreamPrefix+"/", streamHandler)
164	mux.Handle(RaftSnapshotPrefix, snapHandler)
165	mux.Handle(ProbingPrefix, probing.NewHandler())
166	return mux
167}
168
169func (t *Transport) Get(id types.ID) Peer {
170	t.mu.RLock()
171	defer t.mu.RUnlock()
172	return t.peers[id]
173}
174
175func (t *Transport) Send(msgs []raftpb.Message) {
176	for _, m := range msgs {
177		if m.To == 0 {
178			// ignore intentionally dropped message
179			continue
180		}
181		to := types.ID(m.To)
182
183		t.mu.RLock()
184		p, pok := t.peers[to]
185		g, rok := t.remotes[to]
186		t.mu.RUnlock()
187
188		if pok {
189			if m.Type == raftpb.MsgApp {
190				t.ServerStats.SendAppendReq(m.Size())
191			}
192			p.send(m)
193			continue
194		}
195
196		if rok {
197			g.send(m)
198			continue
199		}
200
201		if t.Logger != nil {
202			t.Logger.Debug(
203				"ignored message send request; unknown remote peer target",
204				zap.String("type", m.Type.String()),
205				zap.String("unknown-target-peer-id", to.String()),
206			)
207		}
208	}
209}
210
211func (t *Transport) Stop() {
212	t.mu.Lock()
213	defer t.mu.Unlock()
214	for _, r := range t.remotes {
215		r.stop()
216	}
217	for _, p := range t.peers {
218		p.stop()
219	}
220	t.pipelineProber.RemoveAll()
221	t.streamProber.RemoveAll()
222	if tr, ok := t.streamRt.(*http.Transport); ok {
223		tr.CloseIdleConnections()
224	}
225	if tr, ok := t.pipelineRt.(*http.Transport); ok {
226		tr.CloseIdleConnections()
227	}
228	t.peers = nil
229	t.remotes = nil
230}
231
232// CutPeer drops messages to the specified peer.
233func (t *Transport) CutPeer(id types.ID) {
234	t.mu.RLock()
235	p, pok := t.peers[id]
236	g, gok := t.remotes[id]
237	t.mu.RUnlock()
238
239	if pok {
240		p.(Pausable).Pause()
241	}
242	if gok {
243		g.Pause()
244	}
245}
246
247// MendPeer recovers the message dropping behavior of the given peer.
248func (t *Transport) MendPeer(id types.ID) {
249	t.mu.RLock()
250	p, pok := t.peers[id]
251	g, gok := t.remotes[id]
252	t.mu.RUnlock()
253
254	if pok {
255		p.(Pausable).Resume()
256	}
257	if gok {
258		g.Resume()
259	}
260}
261
262func (t *Transport) AddRemote(id types.ID, us []string) {
263	t.mu.Lock()
264	defer t.mu.Unlock()
265	if t.remotes == nil {
266		// there's no clean way to shutdown the golang http server
267		// (see: https://github.com/golang/go/issues/4674) before
268		// stopping the transport; ignore any new connections.
269		return
270	}
271	if _, ok := t.peers[id]; ok {
272		return
273	}
274	if _, ok := t.remotes[id]; ok {
275		return
276	}
277	urls, err := types.NewURLs(us)
278	if err != nil {
279		if t.Logger != nil {
280			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
281		}
282	}
283	t.remotes[id] = startRemote(t, urls, id)
284
285	if t.Logger != nil {
286		t.Logger.Info(
287			"added new remote peer",
288			zap.String("local-member-id", t.ID.String()),
289			zap.String("remote-peer-id", id.String()),
290			zap.Strings("remote-peer-urls", us),
291		)
292	}
293}
294
295func (t *Transport) AddPeer(id types.ID, us []string) {
296	t.mu.Lock()
297	defer t.mu.Unlock()
298
299	if t.peers == nil {
300		panic("transport stopped")
301	}
302	if _, ok := t.peers[id]; ok {
303		return
304	}
305	urls, err := types.NewURLs(us)
306	if err != nil {
307		if t.Logger != nil {
308			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
309		}
310	}
311	fs := t.LeaderStats.Follower(id.String())
312	t.peers[id] = startPeer(t, urls, id, fs)
313	addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
314	addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
315
316	if t.Logger != nil {
317		t.Logger.Info(
318			"added remote peer",
319			zap.String("local-member-id", t.ID.String()),
320			zap.String("remote-peer-id", id.String()),
321			zap.Strings("remote-peer-urls", us),
322		)
323	}
324}
325
326func (t *Transport) RemovePeer(id types.ID) {
327	t.mu.Lock()
328	defer t.mu.Unlock()
329	t.removePeer(id)
330}
331
332func (t *Transport) RemoveAllPeers() {
333	t.mu.Lock()
334	defer t.mu.Unlock()
335	for id := range t.peers {
336		t.removePeer(id)
337	}
338}
339
340// the caller of this function must have the peers mutex.
341func (t *Transport) removePeer(id types.ID) {
342	if peer, ok := t.peers[id]; ok {
343		peer.stop()
344	} else {
345		if t.Logger != nil {
346			t.Logger.Panic("unexpected removal of unknown remote peer", zap.String("remote-peer-id", id.String()))
347		}
348	}
349	delete(t.peers, id)
350	delete(t.LeaderStats.Followers, id.String())
351	t.pipelineProber.Remove(id.String())
352	t.streamProber.Remove(id.String())
353
354	if t.Logger != nil {
355		t.Logger.Info(
356			"removed remote peer",
357			zap.String("local-member-id", t.ID.String()),
358			zap.String("removed-remote-peer-id", id.String()),
359		)
360	}
361}
362
363func (t *Transport) UpdatePeer(id types.ID, us []string) {
364	t.mu.Lock()
365	defer t.mu.Unlock()
366	// TODO: return error or just panic?
367	if _, ok := t.peers[id]; !ok {
368		return
369	}
370	urls, err := types.NewURLs(us)
371	if err != nil {
372		if t.Logger != nil {
373			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
374		}
375	}
376	t.peers[id].update(urls)
377
378	t.pipelineProber.Remove(id.String())
379	addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
380	t.streamProber.Remove(id.String())
381	addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
382
383	if t.Logger != nil {
384		t.Logger.Info(
385			"updated remote peer",
386			zap.String("local-member-id", t.ID.String()),
387			zap.String("updated-remote-peer-id", id.String()),
388			zap.Strings("updated-remote-peer-urls", us),
389		)
390	}
391}
392
393func (t *Transport) ActiveSince(id types.ID) time.Time {
394	t.mu.RLock()
395	defer t.mu.RUnlock()
396	if p, ok := t.peers[id]; ok {
397		return p.activeSince()
398	}
399	return time.Time{}
400}
401
402func (t *Transport) SendSnapshot(m snap.Message) {
403	t.mu.Lock()
404	defer t.mu.Unlock()
405	p := t.peers[types.ID(m.To)]
406	if p == nil {
407		m.CloseWithError(errMemberNotFound)
408		return
409	}
410	p.sendSnap(m)
411}
412
413// Pausable is a testing interface for pausing transport traffic.
414type Pausable interface {
415	Pause()
416	Resume()
417}
418
419func (t *Transport) Pause() {
420	t.mu.RLock()
421	defer t.mu.RUnlock()
422	for _, p := range t.peers {
423		p.(Pausable).Pause()
424	}
425}
426
427func (t *Transport) Resume() {
428	t.mu.RLock()
429	defer t.mu.RUnlock()
430	for _, p := range t.peers {
431		p.(Pausable).Resume()
432	}
433}
434
435// ActivePeers returns a channel that closes when an initial
436// peer connection has been established. Use this to wait until the
437// first peer connection becomes active.
438func (t *Transport) ActivePeers() (cnt int) {
439	t.mu.RLock()
440	defer t.mu.RUnlock()
441	for _, p := range t.peers {
442		if !p.activeSince().IsZero() {
443			cnt++
444		}
445	}
446	return cnt
447}
448