1// Copyright 2015 CoreOS, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18	"errors"
19
20	pb "github.com/coreos/etcd/raft/raftpb"
21)
22
23// ErrStepLocalMsg is returned when try to step a local raft message
24var ErrStepLocalMsg = errors.New("raft: cannot step raft local message")
25
26// ErrStepPeerNotFound is returned when try to step a response message
27// but there is no peer found in raft.prs for that node.
28var ErrStepPeerNotFound = errors.New("raft: cannot step as peer not found")
29
30// RawNode is a thread-unsafe Node.
31// The methods of this struct correspond to the methods of Node and are described
32// more fully there.
33type RawNode struct {
34	raft       *raft
35	prevSoftSt *SoftState
36	prevHardSt pb.HardState
37}
38
39func (rn *RawNode) newReady() Ready {
40	return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
41}
42
43func (rn *RawNode) commitReady(rd Ready) {
44	if rd.SoftState != nil {
45		rn.prevSoftSt = rd.SoftState
46	}
47	if !IsEmptyHardState(rd.HardState) {
48		rn.prevHardSt = rd.HardState
49	}
50	if rn.prevHardSt.Commit != 0 {
51		// In most cases, prevHardSt and rd.HardState will be the same
52		// because when there are new entries to apply we just sent a
53		// HardState with an updated Commit value. However, on initial
54		// startup the two are different because we don't send a HardState
55		// until something changes, but we do send any un-applied but
56		// committed entries (and previously-committed entries may be
57		// incorporated into the snapshot, even if rd.CommittedEntries is
58		// empty). Therefore we mark all committed entries as applied
59		// whether they were included in rd.HardState or not.
60		rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit)
61	}
62	if len(rd.Entries) > 0 {
63		e := rd.Entries[len(rd.Entries)-1]
64		rn.raft.raftLog.stableTo(e.Index, e.Term)
65	}
66	if !IsEmptySnap(rd.Snapshot) {
67		rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
68	}
69}
70
71// NewRawNode returns a new RawNode given configuration and a list of raft peers.
72func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
73	if config.ID == 0 {
74		panic("config.ID must not be zero")
75	}
76	r := newRaft(config)
77	rn := &RawNode{
78		raft: r,
79	}
80	lastIndex, err := config.Storage.LastIndex()
81	if err != nil {
82		panic(err) // TODO(bdarnell)
83	}
84	// If the log is empty, this is a new RawNode (like StartNode); otherwise it's
85	// restoring an existing RawNode (like RestartNode).
86	// TODO(bdarnell): rethink RawNode initialization and whether the application needs
87	// to be able to tell us when it expects the RawNode to exist.
88	if lastIndex == 0 {
89		r.becomeFollower(1, None)
90		ents := make([]pb.Entry, len(peers))
91		for i, peer := range peers {
92			cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
93			data, err := cc.Marshal()
94			if err != nil {
95				panic("unexpected marshal error")
96			}
97
98			ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
99		}
100		r.raftLog.append(ents...)
101		r.raftLog.committed = uint64(len(ents))
102		for _, peer := range peers {
103			r.addNode(peer.ID)
104		}
105	}
106	// Set the initial hard and soft states after performing all initialization.
107	rn.prevSoftSt = r.softState()
108	rn.prevHardSt = r.hardState()
109
110	return rn, nil
111}
112
113// Tick advances the internal logical clock by a single tick.
114func (rn *RawNode) Tick() {
115	rn.raft.tick()
116}
117
118// Campaign causes this RawNode to transition to candidate state.
119func (rn *RawNode) Campaign() error {
120	return rn.raft.Step(pb.Message{
121		Type: pb.MsgHup,
122	})
123}
124
125// Propose proposes data be appended to the raft log.
126func (rn *RawNode) Propose(data []byte) error {
127	return rn.raft.Step(pb.Message{
128		Type: pb.MsgProp,
129		From: rn.raft.id,
130		Entries: []pb.Entry{
131			{Data: data},
132		}})
133}
134
135// ProposeConfChange proposes a config change.
136func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
137	data, err := cc.Marshal()
138	if err != nil {
139		return err
140	}
141	return rn.raft.Step(pb.Message{
142		Type: pb.MsgProp,
143		Entries: []pb.Entry{
144			{Type: pb.EntryConfChange, Data: data},
145		},
146	})
147}
148
149// ApplyConfChange applies a config change to the local node.
150func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
151	if cc.NodeID == None {
152		rn.raft.resetPendingConf()
153		return &pb.ConfState{Nodes: rn.raft.nodes()}
154	}
155	switch cc.Type {
156	case pb.ConfChangeAddNode:
157		rn.raft.addNode(cc.NodeID)
158	case pb.ConfChangeRemoveNode:
159		rn.raft.removeNode(cc.NodeID)
160	case pb.ConfChangeUpdateNode:
161		rn.raft.resetPendingConf()
162	default:
163		panic("unexpected conf type")
164	}
165	return &pb.ConfState{Nodes: rn.raft.nodes()}
166}
167
168// Step advances the state machine using the given message.
169func (rn *RawNode) Step(m pb.Message) error {
170	// ignore unexpected local messages receiving over network
171	if IsLocalMsg(m) {
172		return ErrStepLocalMsg
173	}
174	if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m) {
175		return rn.raft.Step(m)
176	}
177	return ErrStepPeerNotFound
178}
179
180// Ready returns the current point-in-time state of this RawNode.
181func (rn *RawNode) Ready() Ready {
182	rd := rn.newReady()
183	rn.raft.msgs = nil
184	return rd
185}
186
187// HasReady called when RawNode user need to check if any Ready pending.
188// Checking logic in this method should be consistent with Ready.containsUpdates().
189func (rn *RawNode) HasReady() bool {
190	r := rn.raft
191	if !r.softState().equal(rn.prevSoftSt) {
192		return true
193	}
194	if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
195		return true
196	}
197	if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) {
198		return true
199	}
200	if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
201		return true
202	}
203	return false
204}
205
206// Advance notifies the RawNode that the application has applied and saved progress in the
207// last Ready results.
208func (rn *RawNode) Advance(rd Ready) {
209	rn.commitReady(rd)
210}
211
212// Status returns the current status of the given group.
213func (rn *RawNode) Status() *Status {
214	status := getStatus(rn.raft)
215	return &status
216}
217
218// ReportUnreachable reports the given node is not reachable for the last send.
219func (rn *RawNode) ReportUnreachable(id uint64) {
220	_ = rn.raft.Step(pb.Message{Type: pb.MsgUnreachable, From: id})
221}
222
223// ReportSnapshot reports the status of the sent snapshot.
224func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) {
225	rej := status == SnapshotFailure
226
227	_ = rn.raft.Step(pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej})
228}
229