1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"bytes"
19	"context"
20	"io"
21	"io/ioutil"
22	"net/http"
23	"time"
24
25	"go.etcd.io/etcd/etcdserver/api/snap"
26	"go.etcd.io/etcd/pkg/httputil"
27	pioutil "go.etcd.io/etcd/pkg/ioutil"
28	"go.etcd.io/etcd/pkg/types"
29	"go.etcd.io/etcd/raft"
30
31	"github.com/dustin/go-humanize"
32	"go.uber.org/zap"
33)
34
35var (
36	// timeout for reading snapshot response body
37	snapResponseReadTimeout = 5 * time.Second
38)
39
40type snapshotSender struct {
41	from, to types.ID
42	cid      types.ID
43
44	tr     *Transport
45	picker *urlPicker
46	status *peerStatus
47	r      Raft
48	errorc chan error
49
50	stopc chan struct{}
51}
52
53func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *peerStatus) *snapshotSender {
54	return &snapshotSender{
55		from:   tr.ID,
56		to:     to,
57		cid:    tr.ClusterID,
58		tr:     tr,
59		picker: picker,
60		status: status,
61		r:      tr.Raft,
62		errorc: tr.ErrorC,
63		stopc:  make(chan struct{}),
64	}
65}
66
67func (s *snapshotSender) stop() { close(s.stopc) }
68
69func (s *snapshotSender) send(merged snap.Message) {
70	start := time.Now()
71
72	m := merged.Message
73	to := types.ID(m.To).String()
74
75	body := createSnapBody(s.tr.Logger, merged)
76	defer body.Close()
77
78	u := s.picker.pick()
79	req := createPostRequest(s.tr.Logger, u, RaftSnapshotPrefix, body, "application/octet-stream", s.tr.URLs, s.from, s.cid)
80
81	if s.tr.Logger != nil {
82		s.tr.Logger.Info(
83			"sending database snapshot",
84			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
85			zap.String("remote-peer-id", to),
86			zap.Int64("bytes", merged.TotalSize),
87			zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
88		)
89	}
90
91	snapshotSendInflights.WithLabelValues(to).Inc()
92	defer func() {
93		snapshotSendInflights.WithLabelValues(to).Dec()
94	}()
95
96	err := s.post(req)
97	defer merged.CloseWithError(err)
98	if err != nil {
99		if s.tr.Logger != nil {
100			s.tr.Logger.Warn(
101				"failed to send database snapshot",
102				zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
103				zap.String("remote-peer-id", to),
104				zap.Int64("bytes", merged.TotalSize),
105				zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
106				zap.Error(err),
107			)
108		}
109
110		// errMemberRemoved is a critical error since a removed member should
111		// always be stopped. So we use reportCriticalError to report it to errorc.
112		if err == errMemberRemoved {
113			reportCriticalError(err, s.errorc)
114		}
115
116		s.picker.unreachable(u)
117		s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error())
118		s.r.ReportUnreachable(m.To)
119		// report SnapshotFailure to raft state machine. After raft state
120		// machine knows about it, it would pause a while and retry sending
121		// new snapshot message.
122		s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
123		sentFailures.WithLabelValues(to).Inc()
124		snapshotSendFailures.WithLabelValues(to).Inc()
125		return
126	}
127	s.status.activate()
128	s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
129
130	if s.tr.Logger != nil {
131		s.tr.Logger.Info(
132			"sent database snapshot",
133			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
134			zap.String("remote-peer-id", to),
135			zap.Int64("bytes", merged.TotalSize),
136			zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
137		)
138	}
139
140	sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
141	snapshotSend.WithLabelValues(to).Inc()
142	snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
143}
144
145// post posts the given request.
146// It returns nil when request is sent out and processed successfully.
147func (s *snapshotSender) post(req *http.Request) (err error) {
148	ctx, cancel := context.WithCancel(context.Background())
149	req = req.WithContext(ctx)
150	defer cancel()
151
152	type responseAndError struct {
153		resp *http.Response
154		body []byte
155		err  error
156	}
157	result := make(chan responseAndError, 1)
158
159	go func() {
160		resp, err := s.tr.pipelineRt.RoundTrip(req)
161		if err != nil {
162			result <- responseAndError{resp, nil, err}
163			return
164		}
165
166		// close the response body when timeouts.
167		// prevents from reading the body forever when the other side dies right after
168		// successfully receives the request body.
169		time.AfterFunc(snapResponseReadTimeout, func() { httputil.GracefulClose(resp) })
170		body, err := ioutil.ReadAll(resp.Body)
171		result <- responseAndError{resp, body, err}
172	}()
173
174	select {
175	case <-s.stopc:
176		return errStopped
177	case r := <-result:
178		if r.err != nil {
179			return r.err
180		}
181		return checkPostResponse(s.tr.Logger, r.resp, r.body, req, s.to)
182	}
183}
184
185func createSnapBody(lg *zap.Logger, merged snap.Message) io.ReadCloser {
186	buf := new(bytes.Buffer)
187	enc := &messageEncoder{w: buf}
188	// encode raft message
189	if err := enc.encode(&merged.Message); err != nil {
190		if lg != nil {
191			lg.Panic("failed to encode message", zap.Error(err))
192		}
193	}
194
195	return &pioutil.ReaderAndCloser{
196		Reader: io.MultiReader(buf, merged.ReadCloser),
197		Closer: merged.ReadCloser,
198	}
199}
200