1package fsm
2
3import (
4	"fmt"
5	"io"
6	"sync"
7	"time"
8
9	"github.com/hashicorp/go-hclog"
10	"github.com/hashicorp/go-msgpack/codec"
11	"github.com/hashicorp/go-raftchunking"
12	"github.com/hashicorp/raft"
13
14	"github.com/hashicorp/consul/agent/consul/state"
15	"github.com/hashicorp/consul/agent/structs"
16	"github.com/hashicorp/consul/logging"
17)
18
19// command is a command method on the FSM.
20type command func(buf []byte, index uint64) interface{}
21
22// unboundCommand is a command method on the FSM, not yet bound to an FSM
23// instance.
24type unboundCommand func(c *FSM, buf []byte, index uint64) interface{}
25
26// commands is a map from message type to unbound command.
27var commands map[structs.MessageType]unboundCommand
28
29// registerCommand registers a new command with the FSM, which should be done
30// at package init() time.
31func registerCommand(msg structs.MessageType, fn unboundCommand) {
32	if commands == nil {
33		commands = make(map[structs.MessageType]unboundCommand)
34	}
35	if commands[msg] != nil {
36		panic(fmt.Errorf("Message %d is already registered", msg))
37	}
38	commands[msg] = fn
39}
40
41// FSM implements a finite state machine that is used
42// along with Raft to provide strong consistency. We implement
43// this outside the Server to avoid exposing this outside the package.
44type FSM struct {
45	deps    Deps
46	logger  hclog.Logger
47	chunker *raftchunking.ChunkingFSM
48
49	// apply is built off the commands global and is used to route apply
50	// operations to their appropriate handlers.
51	apply map[structs.MessageType]command
52
53	// stateLock is only used to protect outside callers to State() from
54	// racing with Restore(), which is called by Raft (it puts in a totally
55	// new state store). Everything internal here is synchronized by the
56	// Raft side, so doesn't need to lock this.
57	stateLock sync.RWMutex
58	state     *state.Store
59}
60
61// New is used to construct a new FSM with a blank state.
62//
63// Deprecated: use NewFromDeps.
64func New(gc *state.TombstoneGC, logger hclog.Logger) (*FSM, error) {
65	newStateStore := func() *state.Store {
66		return state.NewStateStore(gc)
67	}
68	return NewFromDeps(Deps{Logger: logger, NewStateStore: newStateStore}), nil
69}
70
71// Deps are dependencies used to construct the FSM.
72type Deps struct {
73	// Logger used to emit log messages
74	Logger hclog.Logger
75	// NewStateStore returns a state.Store which the FSM will use to make changes
76	// to the state.
77	// NewStateStore will be called once when the FSM is created and again any
78	// time Restore() is called.
79	NewStateStore func() *state.Store
80}
81
82// NewFromDeps creates a new FSM from its dependencies.
83func NewFromDeps(deps Deps) *FSM {
84	if deps.Logger == nil {
85		deps.Logger = hclog.New(&hclog.LoggerOptions{})
86	}
87
88	fsm := &FSM{
89		deps:   deps,
90		logger: deps.Logger.Named(logging.FSM),
91		apply:  make(map[structs.MessageType]command),
92		state:  deps.NewStateStore(),
93	}
94
95	// Build out the apply dispatch table based on the registered commands.
96	for msg, fn := range commands {
97		thisFn := fn
98		fsm.apply[msg] = func(buf []byte, index uint64) interface{} {
99			return thisFn(fsm, buf, index)
100		}
101	}
102
103	fsm.chunker = raftchunking.NewChunkingFSM(fsm, nil)
104	return fsm
105}
106
107func (c *FSM) ChunkingFSM() *raftchunking.ChunkingFSM {
108	return c.chunker
109}
110
111// State is used to return a handle to the current state
112func (c *FSM) State() *state.Store {
113	c.stateLock.RLock()
114	defer c.stateLock.RUnlock()
115	return c.state
116}
117
118func (c *FSM) Apply(log *raft.Log) interface{} {
119	buf := log.Data
120	msgType := structs.MessageType(buf[0])
121
122	// Check if this message type should be ignored when unknown. This is
123	// used so that new commands can be added with developer control if older
124	// versions can safely ignore the command, or if they should crash.
125	ignoreUnknown := false
126	if msgType&structs.IgnoreUnknownTypeFlag == structs.IgnoreUnknownTypeFlag {
127		msgType &= ^structs.IgnoreUnknownTypeFlag
128		ignoreUnknown = true
129	}
130
131	// Apply based on the dispatch table, if possible.
132	if fn := c.apply[msgType]; fn != nil {
133		return fn(buf[1:], log.Index)
134	}
135
136	// Otherwise, see if it's safe to ignore. If not, we have to panic so
137	// that we crash and our state doesn't diverge.
138	if ignoreUnknown {
139		c.logger.Warn("ignoring unknown message type, upgrade to newer version", "type", msgType)
140		return nil
141	}
142	panic(fmt.Errorf("failed to apply request: %#v", buf))
143}
144
145func (c *FSM) Snapshot() (raft.FSMSnapshot, error) {
146	defer func(start time.Time) {
147		c.logger.Info("snapshot created", "duration", time.Since(start).String())
148	}(time.Now())
149
150	chunkState, err := c.chunker.CurrentState()
151	if err != nil {
152		return nil, err
153	}
154
155	return &snapshot{
156		state:      c.state.Snapshot(),
157		chunkState: chunkState,
158	}, nil
159}
160
161// Restore streams in the snapshot and replaces the current state store with a
162// new one based on the snapshot if all goes OK during the restore.
163func (c *FSM) Restore(old io.ReadCloser) error {
164	defer old.Close()
165
166	stateNew := c.deps.NewStateStore()
167
168	// Set up a new restore transaction
169	restore := stateNew.Restore()
170	defer restore.Abort()
171
172	handler := func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error {
173		switch {
174		case msg == structs.ChunkingStateType:
175			chunkState := &raftchunking.State{
176				ChunkMap: make(raftchunking.ChunkMap),
177			}
178			if err := dec.Decode(chunkState); err != nil {
179				return err
180			}
181			if err := c.chunker.RestoreState(chunkState); err != nil {
182				return err
183			}
184		case restorers[msg] != nil:
185			fn := restorers[msg]
186			if err := fn(header, restore, dec); err != nil {
187				return err
188			}
189		default:
190			return fmt.Errorf("Unrecognized msg type %d", msg)
191		}
192		return nil
193	}
194	if err := ReadSnapshot(old, handler); err != nil {
195		return err
196	}
197
198	if err := restore.Commit(); err != nil {
199		return err
200	}
201
202	// External code might be calling State(), so we need to synchronize
203	// here to make sure we swap in the new state store atomically.
204	c.stateLock.Lock()
205	stateOld := c.state
206	c.state = stateNew
207	c.stateLock.Unlock()
208
209	// Signal that the old state store has been abandoned. This is required
210	// because we don't operate on it any more, we just throw it away, so
211	// blocking queries won't see any changes and need to be woken up.
212	stateOld.Abandon()
213	return nil
214}
215
216// ReadSnapshot decodes each message type and utilizes the handler function to
217// process each message type individually
218func ReadSnapshot(r io.Reader, handler func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error) error {
219	// Create a decoder
220	dec := codec.NewDecoder(r, structs.MsgpackHandle)
221
222	// Read in the header
223	var header SnapshotHeader
224	if err := dec.Decode(&header); err != nil {
225		return err
226	}
227
228	// Populate the new state
229	msgType := make([]byte, 1)
230	for {
231		// Read the message type
232		_, err := r.Read(msgType)
233		if err == io.EOF {
234			return nil
235		} else if err != nil {
236			return err
237		}
238
239		// Decode
240		msg := structs.MessageType(msgType[0])
241
242		if err := handler(&header, msg, dec); err != nil {
243			return err
244		}
245	}
246}
247