1package fsm 2 3import ( 4 "fmt" 5 "io" 6 "sync" 7 "time" 8 9 "github.com/hashicorp/go-hclog" 10 "github.com/hashicorp/go-msgpack/codec" 11 "github.com/hashicorp/go-raftchunking" 12 "github.com/hashicorp/raft" 13 14 "github.com/hashicorp/consul/agent/consul/state" 15 "github.com/hashicorp/consul/agent/structs" 16 "github.com/hashicorp/consul/logging" 17) 18 19// command is a command method on the FSM. 20type command func(buf []byte, index uint64) interface{} 21 22// unboundCommand is a command method on the FSM, not yet bound to an FSM 23// instance. 24type unboundCommand func(c *FSM, buf []byte, index uint64) interface{} 25 26// commands is a map from message type to unbound command. 27var commands map[structs.MessageType]unboundCommand 28 29// registerCommand registers a new command with the FSM, which should be done 30// at package init() time. 31func registerCommand(msg structs.MessageType, fn unboundCommand) { 32 if commands == nil { 33 commands = make(map[structs.MessageType]unboundCommand) 34 } 35 if commands[msg] != nil { 36 panic(fmt.Errorf("Message %d is already registered", msg)) 37 } 38 commands[msg] = fn 39} 40 41// FSM implements a finite state machine that is used 42// along with Raft to provide strong consistency. We implement 43// this outside the Server to avoid exposing this outside the package. 44type FSM struct { 45 deps Deps 46 logger hclog.Logger 47 chunker *raftchunking.ChunkingFSM 48 49 // apply is built off the commands global and is used to route apply 50 // operations to their appropriate handlers. 51 apply map[structs.MessageType]command 52 53 // stateLock is only used to protect outside callers to State() from 54 // racing with Restore(), which is called by Raft (it puts in a totally 55 // new state store). Everything internal here is synchronized by the 56 // Raft side, so doesn't need to lock this. 57 stateLock sync.RWMutex 58 state *state.Store 59} 60 61// New is used to construct a new FSM with a blank state. 62// 63// Deprecated: use NewFromDeps. 64func New(gc *state.TombstoneGC, logger hclog.Logger) (*FSM, error) { 65 newStateStore := func() *state.Store { 66 return state.NewStateStore(gc) 67 } 68 return NewFromDeps(Deps{Logger: logger, NewStateStore: newStateStore}), nil 69} 70 71// Deps are dependencies used to construct the FSM. 72type Deps struct { 73 // Logger used to emit log messages 74 Logger hclog.Logger 75 // NewStateStore returns a state.Store which the FSM will use to make changes 76 // to the state. 77 // NewStateStore will be called once when the FSM is created and again any 78 // time Restore() is called. 79 NewStateStore func() *state.Store 80} 81 82// NewFromDeps creates a new FSM from its dependencies. 83func NewFromDeps(deps Deps) *FSM { 84 if deps.Logger == nil { 85 deps.Logger = hclog.New(&hclog.LoggerOptions{}) 86 } 87 88 fsm := &FSM{ 89 deps: deps, 90 logger: deps.Logger.Named(logging.FSM), 91 apply: make(map[structs.MessageType]command), 92 state: deps.NewStateStore(), 93 } 94 95 // Build out the apply dispatch table based on the registered commands. 96 for msg, fn := range commands { 97 thisFn := fn 98 fsm.apply[msg] = func(buf []byte, index uint64) interface{} { 99 return thisFn(fsm, buf, index) 100 } 101 } 102 103 fsm.chunker = raftchunking.NewChunkingFSM(fsm, nil) 104 return fsm 105} 106 107func (c *FSM) ChunkingFSM() *raftchunking.ChunkingFSM { 108 return c.chunker 109} 110 111// State is used to return a handle to the current state 112func (c *FSM) State() *state.Store { 113 c.stateLock.RLock() 114 defer c.stateLock.RUnlock() 115 return c.state 116} 117 118func (c *FSM) Apply(log *raft.Log) interface{} { 119 buf := log.Data 120 msgType := structs.MessageType(buf[0]) 121 122 // Check if this message type should be ignored when unknown. This is 123 // used so that new commands can be added with developer control if older 124 // versions can safely ignore the command, or if they should crash. 125 ignoreUnknown := false 126 if msgType&structs.IgnoreUnknownTypeFlag == structs.IgnoreUnknownTypeFlag { 127 msgType &= ^structs.IgnoreUnknownTypeFlag 128 ignoreUnknown = true 129 } 130 131 // Apply based on the dispatch table, if possible. 132 if fn := c.apply[msgType]; fn != nil { 133 return fn(buf[1:], log.Index) 134 } 135 136 // Otherwise, see if it's safe to ignore. If not, we have to panic so 137 // that we crash and our state doesn't diverge. 138 if ignoreUnknown { 139 c.logger.Warn("ignoring unknown message type, upgrade to newer version", "type", msgType) 140 return nil 141 } 142 panic(fmt.Errorf("failed to apply request: %#v", buf)) 143} 144 145func (c *FSM) Snapshot() (raft.FSMSnapshot, error) { 146 defer func(start time.Time) { 147 c.logger.Info("snapshot created", "duration", time.Since(start).String()) 148 }(time.Now()) 149 150 chunkState, err := c.chunker.CurrentState() 151 if err != nil { 152 return nil, err 153 } 154 155 return &snapshot{ 156 state: c.state.Snapshot(), 157 chunkState: chunkState, 158 }, nil 159} 160 161// Restore streams in the snapshot and replaces the current state store with a 162// new one based on the snapshot if all goes OK during the restore. 163func (c *FSM) Restore(old io.ReadCloser) error { 164 defer old.Close() 165 166 stateNew := c.deps.NewStateStore() 167 168 // Set up a new restore transaction 169 restore := stateNew.Restore() 170 defer restore.Abort() 171 172 handler := func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error { 173 switch { 174 case msg == structs.ChunkingStateType: 175 chunkState := &raftchunking.State{ 176 ChunkMap: make(raftchunking.ChunkMap), 177 } 178 if err := dec.Decode(chunkState); err != nil { 179 return err 180 } 181 if err := c.chunker.RestoreState(chunkState); err != nil { 182 return err 183 } 184 case restorers[msg] != nil: 185 fn := restorers[msg] 186 if err := fn(header, restore, dec); err != nil { 187 return err 188 } 189 default: 190 return fmt.Errorf("Unrecognized msg type %d", msg) 191 } 192 return nil 193 } 194 if err := ReadSnapshot(old, handler); err != nil { 195 return err 196 } 197 198 if err := restore.Commit(); err != nil { 199 return err 200 } 201 202 // External code might be calling State(), so we need to synchronize 203 // here to make sure we swap in the new state store atomically. 204 c.stateLock.Lock() 205 stateOld := c.state 206 c.state = stateNew 207 c.stateLock.Unlock() 208 209 // Signal that the old state store has been abandoned. This is required 210 // because we don't operate on it any more, we just throw it away, so 211 // blocking queries won't see any changes and need to be woken up. 212 stateOld.Abandon() 213 return nil 214} 215 216// ReadSnapshot decodes each message type and utilizes the handler function to 217// process each message type individually 218func ReadSnapshot(r io.Reader, handler func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error) error { 219 // Create a decoder 220 dec := codec.NewDecoder(r, structs.MsgpackHandle) 221 222 // Read in the header 223 var header SnapshotHeader 224 if err := dec.Decode(&header); err != nil { 225 return err 226 } 227 228 // Populate the new state 229 msgType := make([]byte, 1) 230 for { 231 // Read the message type 232 _, err := r.Read(msgType) 233 if err == io.EOF { 234 return nil 235 } else if err != nil { 236 return err 237 } 238 239 // Decode 240 msg := structs.MessageType(msgType[0]) 241 242 if err := handler(&header, msg, dec); err != nil { 243 return err 244 } 245 } 246} 247