1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"context"
19	"net/http"
20	"sync"
21	"time"
22
23	"github.com/coreos/etcd/etcdserver/stats"
24	"github.com/coreos/etcd/pkg/logutil"
25	"github.com/coreos/etcd/pkg/transport"
26	"github.com/coreos/etcd/pkg/types"
27	"github.com/coreos/etcd/raft"
28	"github.com/coreos/etcd/raft/raftpb"
29	"github.com/coreos/etcd/snap"
30
31	"github.com/coreos/pkg/capnslog"
32	"github.com/xiang90/probing"
33	"golang.org/x/time/rate"
34)
35
36var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp"))
37
38type Raft interface {
39	Process(ctx context.Context, m raftpb.Message) error
40	IsIDRemoved(id uint64) bool
41	ReportUnreachable(id uint64)
42	ReportSnapshot(id uint64, status raft.SnapshotStatus)
43}
44
45type Transporter interface {
46	// Start starts the given Transporter.
47	// Start MUST be called before calling other functions in the interface.
48	Start() error
49	// Handler returns the HTTP handler of the transporter.
50	// A transporter HTTP handler handles the HTTP requests
51	// from remote peers.
52	// The handler MUST be used to handle RaftPrefix(/raft)
53	// endpoint.
54	Handler() http.Handler
55	// Send sends out the given messages to the remote peers.
56	// Each message has a To field, which is an id that maps
57	// to an existing peer in the transport.
58	// If the id cannot be found in the transport, the message
59	// will be ignored.
60	Send(m []raftpb.Message)
61	// SendSnapshot sends out the given snapshot message to a remote peer.
62	// The behavior of SendSnapshot is similar to Send.
63	SendSnapshot(m snap.Message)
64	// AddRemote adds a remote with given peer urls into the transport.
65	// A remote helps newly joined member to catch up the progress of cluster,
66	// and will not be used after that.
67	// It is the caller's responsibility to ensure the urls are all valid,
68	// or it panics.
69	AddRemote(id types.ID, urls []string)
70	// AddPeer adds a peer with given peer urls into the transport.
71	// It is the caller's responsibility to ensure the urls are all valid,
72	// or it panics.
73	// Peer urls are used to connect to the remote peer.
74	AddPeer(id types.ID, urls []string)
75	// RemovePeer removes the peer with given id.
76	RemovePeer(id types.ID)
77	// RemoveAllPeers removes all the existing peers in the transport.
78	RemoveAllPeers()
79	// UpdatePeer updates the peer urls of the peer with the given id.
80	// It is the caller's responsibility to ensure the urls are all valid,
81	// or it panics.
82	UpdatePeer(id types.ID, urls []string)
83	// ActiveSince returns the time that the connection with the peer
84	// of the given id becomes active.
85	// If the connection is active since peer was added, it returns the adding time.
86	// If the connection is currently inactive, it returns zero time.
87	ActiveSince(id types.ID) time.Time
88	// ActivePeers returns the number of active peers.
89	ActivePeers() int
90	// Stop closes the connections and stops the transporter.
91	Stop()
92}
93
94// Transport implements Transporter interface. It provides the functionality
95// to send raft messages to peers, and receive raft messages from peers.
96// User should call Handler method to get a handler to serve requests
97// received from peerURLs.
98// User needs to call Start before calling other functions, and call
99// Stop when the Transport is no longer used.
100type Transport struct {
101	DialTimeout time.Duration // maximum duration before timing out dial of the request
102	// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
103	// a distinct rate limiter is created per every peer (default value: 10 events/sec)
104	DialRetryFrequency rate.Limit
105
106	TLSInfo transport.TLSInfo // TLS information used when creating connection
107
108	ID          types.ID   // local member ID
109	URLs        types.URLs // local peer URLs
110	ClusterID   types.ID   // raft cluster ID for request validation
111	Raft        Raft       // raft state machine, to which the Transport forwards received messages and reports status
112	Snapshotter *snap.Snapshotter
113	ServerStats *stats.ServerStats // used to record general transportation statistics
114	// used to record transportation statistics with followers when
115	// performing as leader in raft protocol
116	LeaderStats *stats.LeaderStats
117	// ErrorC is used to report detected critical errors, e.g.,
118	// the member has been permanently removed from the cluster
119	// When an error is received from ErrorC, user should stop raft state
120	// machine and thus stop the Transport.
121	ErrorC chan error
122
123	streamRt   http.RoundTripper // roundTripper used by streams
124	pipelineRt http.RoundTripper // roundTripper used by pipelines
125
126	mu      sync.RWMutex         // protect the remote and peer map
127	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
128	peers   map[types.ID]Peer    // peers map
129
130	pipelineProber probing.Prober
131	streamProber   probing.Prober
132}
133
134func (t *Transport) Start() error {
135	var err error
136	t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
137	if err != nil {
138		return err
139	}
140	t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
141	if err != nil {
142		return err
143	}
144	t.remotes = make(map[types.ID]*remote)
145	t.peers = make(map[types.ID]Peer)
146	t.pipelineProber = probing.NewProber(t.pipelineRt)
147	t.streamProber = probing.NewProber(t.streamRt)
148
149	// If client didn't provide dial retry frequency, use the default
150	// (100ms backoff between attempts to create a new stream),
151	// so it doesn't bring too much overhead when retry.
152	if t.DialRetryFrequency == 0 {
153		t.DialRetryFrequency = rate.Every(100 * time.Millisecond)
154	}
155	return nil
156}
157
158func (t *Transport) Handler() http.Handler {
159	pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
160	streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
161	snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
162	mux := http.NewServeMux()
163	mux.Handle(RaftPrefix, pipelineHandler)
164	mux.Handle(RaftStreamPrefix+"/", streamHandler)
165	mux.Handle(RaftSnapshotPrefix, snapHandler)
166	mux.Handle(ProbingPrefix, probing.NewHandler())
167	return mux
168}
169
170func (t *Transport) Get(id types.ID) Peer {
171	t.mu.RLock()
172	defer t.mu.RUnlock()
173	return t.peers[id]
174}
175
176func (t *Transport) Send(msgs []raftpb.Message) {
177	for _, m := range msgs {
178		if m.To == 0 {
179			// ignore intentionally dropped message
180			continue
181		}
182		to := types.ID(m.To)
183
184		t.mu.RLock()
185		p, pok := t.peers[to]
186		g, rok := t.remotes[to]
187		t.mu.RUnlock()
188
189		if pok {
190			if m.Type == raftpb.MsgApp {
191				t.ServerStats.SendAppendReq(m.Size())
192			}
193			p.send(m)
194			continue
195		}
196
197		if rok {
198			g.send(m)
199			continue
200		}
201
202		plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to)
203	}
204}
205
206func (t *Transport) Stop() {
207	t.mu.Lock()
208	defer t.mu.Unlock()
209	for _, r := range t.remotes {
210		r.stop()
211	}
212	for _, p := range t.peers {
213		p.stop()
214	}
215	t.pipelineProber.RemoveAll()
216	t.streamProber.RemoveAll()
217	if tr, ok := t.streamRt.(*http.Transport); ok {
218		tr.CloseIdleConnections()
219	}
220	if tr, ok := t.pipelineRt.(*http.Transport); ok {
221		tr.CloseIdleConnections()
222	}
223	t.peers = nil
224	t.remotes = nil
225}
226
227// CutPeer drops messages to the specified peer.
228func (t *Transport) CutPeer(id types.ID) {
229	t.mu.RLock()
230	p, pok := t.peers[id]
231	g, gok := t.remotes[id]
232	t.mu.RUnlock()
233
234	if pok {
235		p.(Pausable).Pause()
236	}
237	if gok {
238		g.Pause()
239	}
240}
241
242// MendPeer recovers the message dropping behavior of the given peer.
243func (t *Transport) MendPeer(id types.ID) {
244	t.mu.RLock()
245	p, pok := t.peers[id]
246	g, gok := t.remotes[id]
247	t.mu.RUnlock()
248
249	if pok {
250		p.(Pausable).Resume()
251	}
252	if gok {
253		g.Resume()
254	}
255}
256
257func (t *Transport) AddRemote(id types.ID, us []string) {
258	t.mu.Lock()
259	defer t.mu.Unlock()
260	if t.remotes == nil {
261		// there's no clean way to shutdown the golang http server
262		// (see: https://github.com/golang/go/issues/4674) before
263		// stopping the transport; ignore any new connections.
264		return
265	}
266	if _, ok := t.peers[id]; ok {
267		return
268	}
269	if _, ok := t.remotes[id]; ok {
270		return
271	}
272	urls, err := types.NewURLs(us)
273	if err != nil {
274		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
275	}
276	t.remotes[id] = startRemote(t, urls, id)
277}
278
279func (t *Transport) AddPeer(id types.ID, us []string) {
280	t.mu.Lock()
281	defer t.mu.Unlock()
282
283	if t.peers == nil {
284		panic("transport stopped")
285	}
286	if _, ok := t.peers[id]; ok {
287		return
288	}
289	urls, err := types.NewURLs(us)
290	if err != nil {
291		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
292	}
293	fs := t.LeaderStats.Follower(id.String())
294	t.peers[id] = startPeer(t, urls, id, fs)
295	addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
296	addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
297	plog.Infof("added peer %s", id)
298}
299
300func (t *Transport) RemovePeer(id types.ID) {
301	t.mu.Lock()
302	defer t.mu.Unlock()
303	t.removePeer(id)
304}
305
306func (t *Transport) RemoveAllPeers() {
307	t.mu.Lock()
308	defer t.mu.Unlock()
309	for id := range t.peers {
310		t.removePeer(id)
311	}
312}
313
314// the caller of this function must have the peers mutex.
315func (t *Transport) removePeer(id types.ID) {
316	if peer, ok := t.peers[id]; ok {
317		peer.stop()
318	} else {
319		plog.Panicf("unexpected removal of unknown peer '%d'", id)
320	}
321	delete(t.peers, id)
322	delete(t.LeaderStats.Followers, id.String())
323	t.pipelineProber.Remove(id.String())
324	t.streamProber.Remove(id.String())
325	plog.Infof("removed peer %s", id)
326}
327
328func (t *Transport) UpdatePeer(id types.ID, us []string) {
329	t.mu.Lock()
330	defer t.mu.Unlock()
331	// TODO: return error or just panic?
332	if _, ok := t.peers[id]; !ok {
333		return
334	}
335	urls, err := types.NewURLs(us)
336	if err != nil {
337		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
338	}
339	t.peers[id].update(urls)
340
341	t.pipelineProber.Remove(id.String())
342	addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
343	t.streamProber.Remove(id.String())
344	addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
345	plog.Infof("updated peer %s", id)
346}
347
348func (t *Transport) ActiveSince(id types.ID) time.Time {
349	t.mu.Lock()
350	defer t.mu.Unlock()
351	if p, ok := t.peers[id]; ok {
352		return p.activeSince()
353	}
354	return time.Time{}
355}
356
357func (t *Transport) SendSnapshot(m snap.Message) {
358	t.mu.Lock()
359	defer t.mu.Unlock()
360	p := t.peers[types.ID(m.To)]
361	if p == nil {
362		m.CloseWithError(errMemberNotFound)
363		return
364	}
365	p.sendSnap(m)
366}
367
368// Pausable is a testing interface for pausing transport traffic.
369type Pausable interface {
370	Pause()
371	Resume()
372}
373
374func (t *Transport) Pause() {
375	t.mu.RLock()
376	defer t.mu.RUnlock()
377	for _, p := range t.peers {
378		p.(Pausable).Pause()
379	}
380}
381
382func (t *Transport) Resume() {
383	t.mu.RLock()
384	defer t.mu.RUnlock()
385	for _, p := range t.peers {
386		p.(Pausable).Resume()
387	}
388}
389
390// ActivePeers returns a channel that closes when an initial
391// peer connection has been established. Use this to wait until the
392// first peer connection becomes active.
393func (t *Transport) ActivePeers() (cnt int) {
394	t.mu.RLock()
395	defer t.mu.RUnlock()
396	for _, p := range t.peers {
397		if !p.activeSince().IsZero() {
398			cnt++
399		}
400	}
401	return cnt
402}
403
404type nopTransporter struct{}
405
406func NewNopTransporter() Transporter {
407	return &nopTransporter{}
408}
409
410func (s *nopTransporter) Start() error                        { return nil }
411func (s *nopTransporter) Handler() http.Handler               { return nil }
412func (s *nopTransporter) Send(m []raftpb.Message)             {}
413func (s *nopTransporter) SendSnapshot(m snap.Message)         {}
414func (s *nopTransporter) AddRemote(id types.ID, us []string)  {}
415func (s *nopTransporter) AddPeer(id types.ID, us []string)    {}
416func (s *nopTransporter) RemovePeer(id types.ID)              {}
417func (s *nopTransporter) RemoveAllPeers()                     {}
418func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
419func (s *nopTransporter) ActiveSince(id types.ID) time.Time   { return time.Time{} }
420func (s *nopTransporter) ActivePeers() int                    { return 0 }
421func (s *nopTransporter) Stop()                               {}
422func (s *nopTransporter) Pause()                              {}
423func (s *nopTransporter) Resume()                             {}
424
425type snapTransporter struct {
426	nopTransporter
427	snapDoneC chan snap.Message
428	snapDir   string
429}
430
431func NewSnapTransporter(snapDir string) (Transporter, <-chan snap.Message) {
432	ch := make(chan snap.Message, 1)
433	tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir}
434	return tr, ch
435}
436
437func (s *snapTransporter) SendSnapshot(m snap.Message) {
438	ss := snap.New(s.snapDir)
439	ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
440	m.CloseWithError(nil)
441	s.snapDoneC <- m
442}
443