1package submatview
2
3import (
4	"github.com/hashicorp/consul/proto/pbsubscribe"
5)
6
7// eventHandler is a function which performs some operation on the received
8// events, then returns the eventHandler that should be used for the next set
9// of events.
10// If eventHandler fails to handle the events it may return an error. If an
11// error is returned the next eventHandler will be ignored.
12// eventHandler is used to implement a very simple finite-state machine.
13type eventHandler func(state viewState, events *pbsubscribe.Event) (next eventHandler, err error)
14
15type viewState interface {
16	updateView(events []*pbsubscribe.Event, index uint64) error
17	reset()
18}
19
20func initialHandler(index uint64) eventHandler {
21	if index == 0 {
22		return newSnapshotHandler()
23	}
24	return resumeStreamHandler
25}
26
27// snapshotHandler accumulates events. When it receives an EndOfSnapshot event
28// it updates the view, and then returns eventStreamHandler to handle new events.
29type snapshotHandler struct {
30	events []*pbsubscribe.Event
31}
32
33func newSnapshotHandler() eventHandler {
34	return (&snapshotHandler{}).handle
35}
36
37func (h *snapshotHandler) handle(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
38	if event.GetEndOfSnapshot() {
39		err := state.updateView(h.events, event.Index)
40		return eventStreamHandler, err
41	}
42
43	h.events = append(h.events, eventsFromEvent(event)...)
44	return h.handle, nil
45}
46
47// eventStreamHandler handles events by updating the view. It always returns
48// itself as the next handler.
49func eventStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
50	err := state.updateView(eventsFromEvent(event), event.Index)
51	return eventStreamHandler, err
52}
53
54func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event {
55	if batch := event.GetEventBatch(); batch != nil {
56		return batch.Events
57	}
58	return []*pbsubscribe.Event{event}
59}
60
61// resumeStreamHandler checks if the event is a NewSnapshotToFollow event. If it
62// is it resets the view and returns a snapshotHandler to handle the next event.
63// Otherwise it uses eventStreamHandler to handle events.
64func resumeStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
65	if event.GetNewSnapshotToFollow() {
66		state.reset()
67		return newSnapshotHandler(), nil
68	}
69	return eventStreamHandler(state, event)
70}
71