1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"encoding/binary"
19	"fmt"
20	"io"
21	"time"
22
23	stats "go.etcd.io/etcd/etcdserver/api/v2stats"
24	"go.etcd.io/etcd/pkg/pbutil"
25	"go.etcd.io/etcd/pkg/types"
26	"go.etcd.io/etcd/raft/raftpb"
27)
28
29const (
30	msgTypeLinkHeartbeat uint8 = 0
31	msgTypeAppEntries    uint8 = 1
32	msgTypeApp           uint8 = 2
33
34	msgAppV2BufSize = 1024 * 1024
35)
36
37// msgappv2 stream sends three types of message: linkHeartbeatMessage,
38// AppEntries and MsgApp. AppEntries is the MsgApp that is sent in
39// replicate state in raft, whose index and term are fully predictable.
40//
41// Data format of linkHeartbeatMessage:
42// | offset | bytes | description |
43// +--------+-------+-------------+
44// | 0      | 1     | \x00        |
45//
46// Data format of AppEntries:
47// | offset | bytes | description |
48// +--------+-------+-------------+
49// | 0      | 1     | \x01        |
50// | 1      | 8     | length of entries |
51// | 9      | 8     | length of first entry |
52// | 17     | n1    | first entry |
53// ...
54// | x      | 8     | length of k-th entry data |
55// | x+8    | nk    | k-th entry data |
56// | x+8+nk | 8     | commit index |
57//
58// Data format of MsgApp:
59// | offset | bytes | description |
60// +--------+-------+-------------+
61// | 0      | 1     | \x02        |
62// | 1      | 8     | length of encoded message |
63// | 9      | n     | encoded message |
64type msgAppV2Encoder struct {
65	w  io.Writer
66	fs *stats.FollowerStats
67
68	term      uint64
69	index     uint64
70	buf       []byte
71	uint64buf []byte
72	uint8buf  []byte
73}
74
75func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder {
76	return &msgAppV2Encoder{
77		w:         w,
78		fs:        fs,
79		buf:       make([]byte, msgAppV2BufSize),
80		uint64buf: make([]byte, 8),
81		uint8buf:  make([]byte, 1),
82	}
83}
84
85func (enc *msgAppV2Encoder) encode(m *raftpb.Message) error {
86	start := time.Now()
87	switch {
88	case isLinkHeartbeatMessage(m):
89		enc.uint8buf[0] = msgTypeLinkHeartbeat
90		if _, err := enc.w.Write(enc.uint8buf); err != nil {
91			return err
92		}
93	case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term:
94		enc.uint8buf[0] = msgTypeAppEntries
95		if _, err := enc.w.Write(enc.uint8buf); err != nil {
96			return err
97		}
98		// write length of entries
99		binary.BigEndian.PutUint64(enc.uint64buf, uint64(len(m.Entries)))
100		if _, err := enc.w.Write(enc.uint64buf); err != nil {
101			return err
102		}
103		for i := 0; i < len(m.Entries); i++ {
104			// write length of entry
105			binary.BigEndian.PutUint64(enc.uint64buf, uint64(m.Entries[i].Size()))
106			if _, err := enc.w.Write(enc.uint64buf); err != nil {
107				return err
108			}
109			if n := m.Entries[i].Size(); n < msgAppV2BufSize {
110				if _, err := m.Entries[i].MarshalTo(enc.buf); err != nil {
111					return err
112				}
113				if _, err := enc.w.Write(enc.buf[:n]); err != nil {
114					return err
115				}
116			} else {
117				if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil {
118					return err
119				}
120			}
121			enc.index++
122		}
123		// write commit index
124		binary.BigEndian.PutUint64(enc.uint64buf, m.Commit)
125		if _, err := enc.w.Write(enc.uint64buf); err != nil {
126			return err
127		}
128		enc.fs.Succ(time.Since(start))
129	default:
130		if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil {
131			return err
132		}
133		// write size of message
134		if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
135			return err
136		}
137		// write message
138		if _, err := enc.w.Write(pbutil.MustMarshal(m)); err != nil {
139			return err
140		}
141
142		enc.term = m.Term
143		enc.index = m.Index
144		if l := len(m.Entries); l > 0 {
145			enc.index = m.Entries[l-1].Index
146		}
147		enc.fs.Succ(time.Since(start))
148	}
149	return nil
150}
151
152type msgAppV2Decoder struct {
153	r             io.Reader
154	local, remote types.ID
155
156	term      uint64
157	index     uint64
158	buf       []byte
159	uint64buf []byte
160	uint8buf  []byte
161}
162
163func newMsgAppV2Decoder(r io.Reader, local, remote types.ID) *msgAppV2Decoder {
164	return &msgAppV2Decoder{
165		r:         r,
166		local:     local,
167		remote:    remote,
168		buf:       make([]byte, msgAppV2BufSize),
169		uint64buf: make([]byte, 8),
170		uint8buf:  make([]byte, 1),
171	}
172}
173
174func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) {
175	var (
176		m   raftpb.Message
177		typ uint8
178	)
179	if _, err := io.ReadFull(dec.r, dec.uint8buf); err != nil {
180		return m, err
181	}
182	typ = dec.uint8buf[0]
183	switch typ {
184	case msgTypeLinkHeartbeat:
185		return linkHeartbeatMessage, nil
186	case msgTypeAppEntries:
187		m = raftpb.Message{
188			Type:    raftpb.MsgApp,
189			From:    uint64(dec.remote),
190			To:      uint64(dec.local),
191			Term:    dec.term,
192			LogTerm: dec.term,
193			Index:   dec.index,
194		}
195
196		// decode entries
197		if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
198			return m, err
199		}
200		l := binary.BigEndian.Uint64(dec.uint64buf)
201		m.Entries = make([]raftpb.Entry, int(l))
202		for i := 0; i < int(l); i++ {
203			if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
204				return m, err
205			}
206			size := binary.BigEndian.Uint64(dec.uint64buf)
207			var buf []byte
208			if size < msgAppV2BufSize {
209				buf = dec.buf[:size]
210				if _, err := io.ReadFull(dec.r, buf); err != nil {
211					return m, err
212				}
213			} else {
214				buf = make([]byte, int(size))
215				if _, err := io.ReadFull(dec.r, buf); err != nil {
216					return m, err
217				}
218			}
219			dec.index++
220			// 1 alloc
221			pbutil.MustUnmarshal(&m.Entries[i], buf)
222		}
223		// decode commit index
224		if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
225			return m, err
226		}
227		m.Commit = binary.BigEndian.Uint64(dec.uint64buf)
228	case msgTypeApp:
229		var size uint64
230		if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
231			return m, err
232		}
233		buf := make([]byte, int(size))
234		if _, err := io.ReadFull(dec.r, buf); err != nil {
235			return m, err
236		}
237		pbutil.MustUnmarshal(&m, buf)
238
239		dec.term = m.Term
240		dec.index = m.Index
241		if l := len(m.Entries); l > 0 {
242			dec.index = m.Entries[l-1].Index
243		}
244	default:
245		return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ)
246	}
247	return m, nil
248}
249