1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"bytes"
19	"context"
20	"errors"
21	"io/ioutil"
22	"sync"
23	"time"
24
25	stats "go.etcd.io/etcd/etcdserver/api/v2stats"
26	"go.etcd.io/etcd/pkg/pbutil"
27	"go.etcd.io/etcd/pkg/types"
28	"go.etcd.io/etcd/raft"
29	"go.etcd.io/etcd/raft/raftpb"
30
31	"go.uber.org/zap"
32)
33
34const (
35	connPerPipeline = 4
36	// pipelineBufSize is the size of pipeline buffer, which helps hold the
37	// temporary network latency.
38	// The size ensures that pipeline does not drop messages when the network
39	// is out of work for less than 1 second in good path.
40	pipelineBufSize = 64
41)
42
43var errStopped = errors.New("stopped")
44
45type pipeline struct {
46	peerID types.ID
47
48	tr     *Transport
49	picker *urlPicker
50	status *peerStatus
51	raft   Raft
52	errorc chan error
53	// deprecate when we depercate v2 API
54	followerStats *stats.FollowerStats
55
56	msgc chan raftpb.Message
57	// wait for the handling routines
58	wg    sync.WaitGroup
59	stopc chan struct{}
60}
61
62func (p *pipeline) start() {
63	p.stopc = make(chan struct{})
64	p.msgc = make(chan raftpb.Message, pipelineBufSize)
65	p.wg.Add(connPerPipeline)
66	for i := 0; i < connPerPipeline; i++ {
67		go p.handle()
68	}
69
70	if p.tr != nil && p.tr.Logger != nil {
71		p.tr.Logger.Info(
72			"started HTTP pipelining with remote peer",
73			zap.String("local-member-id", p.tr.ID.String()),
74			zap.String("remote-peer-id", p.peerID.String()),
75		)
76	} else {
77		plog.Infof("started HTTP pipelining with peer %s", p.peerID)
78	}
79}
80
81func (p *pipeline) stop() {
82	close(p.stopc)
83	p.wg.Wait()
84
85	if p.tr != nil && p.tr.Logger != nil {
86		p.tr.Logger.Info(
87			"stopped HTTP pipelining with remote peer",
88			zap.String("local-member-id", p.tr.ID.String()),
89			zap.String("remote-peer-id", p.peerID.String()),
90		)
91	} else {
92		plog.Infof("stopped HTTP pipelining with peer %s", p.peerID)
93	}
94}
95
96func (p *pipeline) handle() {
97	defer p.wg.Done()
98
99	for {
100		select {
101		case m := <-p.msgc:
102			start := time.Now()
103			err := p.post(pbutil.MustMarshal(&m))
104			end := time.Now()
105
106			if err != nil {
107				p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
108
109				if m.Type == raftpb.MsgApp && p.followerStats != nil {
110					p.followerStats.Fail()
111				}
112				p.raft.ReportUnreachable(m.To)
113				if isMsgSnap(m) {
114					p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
115				}
116				sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
117				continue
118			}
119
120			p.status.activate()
121			if m.Type == raftpb.MsgApp && p.followerStats != nil {
122				p.followerStats.Succ(end.Sub(start))
123			}
124			if isMsgSnap(m) {
125				p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
126			}
127			sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
128		case <-p.stopc:
129			return
130		}
131	}
132}
133
134// post POSTs a data payload to a url. Returns nil if the POST succeeds,
135// error on any failure.
136func (p *pipeline) post(data []byte) (err error) {
137	u := p.picker.pick()
138	req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
139
140	done := make(chan struct{}, 1)
141	ctx, cancel := context.WithCancel(context.Background())
142	req = req.WithContext(ctx)
143	go func() {
144		select {
145		case <-done:
146		case <-p.stopc:
147			waitSchedule()
148			cancel()
149		}
150	}()
151
152	resp, err := p.tr.pipelineRt.RoundTrip(req)
153	done <- struct{}{}
154	if err != nil {
155		p.picker.unreachable(u)
156		return err
157	}
158	defer resp.Body.Close()
159	b, err := ioutil.ReadAll(resp.Body)
160	if err != nil {
161		p.picker.unreachable(u)
162		return err
163	}
164
165	err = checkPostResponse(resp, b, req, p.peerID)
166	if err != nil {
167		p.picker.unreachable(u)
168		// errMemberRemoved is a critical error since a removed member should
169		// always be stopped. So we use reportCriticalError to report it to errorc.
170		if err == errMemberRemoved {
171			reportCriticalError(err, p.errorc)
172		}
173		return err
174	}
175
176	return nil
177}
178
179// waitSchedule waits other goroutines to be scheduled for a while
180func waitSchedule() { time.Sleep(time.Millisecond) }
181