1package submatview 2 3import ( 4 "github.com/hashicorp/consul/proto/pbsubscribe" 5) 6 7// eventHandler is a function which performs some operation on the received 8// events, then returns the eventHandler that should be used for the next set 9// of events. 10// If eventHandler fails to handle the events it may return an error. If an 11// error is returned the next eventHandler will be ignored. 12// eventHandler is used to implement a very simple finite-state machine. 13type eventHandler func(state viewState, events *pbsubscribe.Event) (next eventHandler, err error) 14 15type viewState interface { 16 updateView(events []*pbsubscribe.Event, index uint64) error 17 reset() 18} 19 20func initialHandler(index uint64) eventHandler { 21 if index == 0 { 22 return newSnapshotHandler() 23 } 24 return resumeStreamHandler 25} 26 27// snapshotHandler accumulates events. When it receives an EndOfSnapshot event 28// it updates the view, and then returns eventStreamHandler to handle new events. 29type snapshotHandler struct { 30 events []*pbsubscribe.Event 31} 32 33func newSnapshotHandler() eventHandler { 34 return (&snapshotHandler{}).handle 35} 36 37func (h *snapshotHandler) handle(state viewState, event *pbsubscribe.Event) (eventHandler, error) { 38 if event.GetEndOfSnapshot() { 39 err := state.updateView(h.events, event.Index) 40 return eventStreamHandler, err 41 } 42 43 h.events = append(h.events, eventsFromEvent(event)...) 44 return h.handle, nil 45} 46 47// eventStreamHandler handles events by updating the view. It always returns 48// itself as the next handler. 49func eventStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) { 50 err := state.updateView(eventsFromEvent(event), event.Index) 51 return eventStreamHandler, err 52} 53 54func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event { 55 if batch := event.GetEventBatch(); batch != nil { 56 return batch.Events 57 } 58 return []*pbsubscribe.Event{event} 59} 60 61// resumeStreamHandler checks if the event is a NewSnapshotToFollow event. If it 62// is it resets the view and returns a snapshotHandler to handle the next event. 63// Otherwise it uses eventStreamHandler to handle events. 64func resumeStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) { 65 if event.GetNewSnapshotToFollow() { 66 state.reset() 67 return newSnapshotHandler(), nil 68 } 69 return eventStreamHandler(state, event) 70} 71