1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"bytes"
19	"context"
20	"errors"
21	"io/ioutil"
22	"sync"
23	"time"
24
25	"github.com/coreos/etcd/etcdserver/stats"
26	"github.com/coreos/etcd/pkg/pbutil"
27	"github.com/coreos/etcd/pkg/types"
28	"github.com/coreos/etcd/raft"
29	"github.com/coreos/etcd/raft/raftpb"
30)
31
32const (
33	connPerPipeline = 4
34	// pipelineBufSize is the size of pipeline buffer, which helps hold the
35	// temporary network latency.
36	// The size ensures that pipeline does not drop messages when the network
37	// is out of work for less than 1 second in good path.
38	pipelineBufSize = 64
39)
40
41var errStopped = errors.New("stopped")
42
43type pipeline struct {
44	peerID types.ID
45
46	tr     *Transport
47	picker *urlPicker
48	status *peerStatus
49	raft   Raft
50	errorc chan error
51	// deprecate when we depercate v2 API
52	followerStats *stats.FollowerStats
53
54	msgc chan raftpb.Message
55	// wait for the handling routines
56	wg    sync.WaitGroup
57	stopc chan struct{}
58}
59
60func (p *pipeline) start() {
61	p.stopc = make(chan struct{})
62	p.msgc = make(chan raftpb.Message, pipelineBufSize)
63	p.wg.Add(connPerPipeline)
64	for i := 0; i < connPerPipeline; i++ {
65		go p.handle()
66	}
67	plog.Infof("started HTTP pipelining with peer %s", p.peerID)
68}
69
70func (p *pipeline) stop() {
71	close(p.stopc)
72	p.wg.Wait()
73	plog.Infof("stopped HTTP pipelining with peer %s", p.peerID)
74}
75
76func (p *pipeline) handle() {
77	defer p.wg.Done()
78
79	for {
80		select {
81		case m := <-p.msgc:
82			start := time.Now()
83			err := p.post(pbutil.MustMarshal(&m))
84			end := time.Now()
85
86			if err != nil {
87				p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
88
89				if m.Type == raftpb.MsgApp && p.followerStats != nil {
90					p.followerStats.Fail()
91				}
92				p.raft.ReportUnreachable(m.To)
93				if isMsgSnap(m) {
94					p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
95				}
96				sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
97				continue
98			}
99
100			p.status.activate()
101			if m.Type == raftpb.MsgApp && p.followerStats != nil {
102				p.followerStats.Succ(end.Sub(start))
103			}
104			if isMsgSnap(m) {
105				p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
106			}
107			sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
108		case <-p.stopc:
109			return
110		}
111	}
112}
113
114// post POSTs a data payload to a url. Returns nil if the POST succeeds,
115// error on any failure.
116func (p *pipeline) post(data []byte) (err error) {
117	u := p.picker.pick()
118	req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
119
120	done := make(chan struct{}, 1)
121	ctx, cancel := context.WithCancel(context.Background())
122	req = req.WithContext(ctx)
123	go func() {
124		select {
125		case <-done:
126		case <-p.stopc:
127			waitSchedule()
128			cancel()
129		}
130	}()
131
132	resp, err := p.tr.pipelineRt.RoundTrip(req)
133	done <- struct{}{}
134	if err != nil {
135		p.picker.unreachable(u)
136		return err
137	}
138	b, err := ioutil.ReadAll(resp.Body)
139	if err != nil {
140		p.picker.unreachable(u)
141		return err
142	}
143	resp.Body.Close()
144
145	err = checkPostResponse(resp, b, req, p.peerID)
146	if err != nil {
147		p.picker.unreachable(u)
148		// errMemberRemoved is a critical error since a removed member should
149		// always be stopped. So we use reportCriticalError to report it to errorc.
150		if err == errMemberRemoved {
151			reportCriticalError(err, p.errorc)
152		}
153		return err
154	}
155
156	return nil
157}
158
159// waitSchedule waits other goroutines to be scheduled for a while
160func waitSchedule() { time.Sleep(time.Millisecond) }
161