1package stream
2
3import (
4	"context"
5	fmt "fmt"
6	"testing"
7	time "time"
8
9	"github.com/stretchr/testify/require"
10)
11
12func TestEventSnapshot(t *testing.T) {
13	// Setup a dummy state that we can manipulate easily. The properties we care
14	// about are that we publish some sequence of events as a snapshot and then
15	// follow them up with "live updates". We control the interleaving. Our state
16	// consists of health events (only type fully defined so far) for service
17	// instances with consecutive ID numbers starting from 0 (e.g. test-000,
18	// test-001). The snapshot is delivered at index 1000. updatesBeforeSnap
19	// controls how many updates are delivered _before_ the snapshot is complete
20	// (with an index < 1000). updatesBeforeSnap controls the number of updates
21	// delivered after (index > 1000).
22	//
23	// In all cases the invariant should be that we end up with all of the
24	// instances in the snapshot, plus any delivered _after_ the snapshot index,
25	// but none delivered _before_ the snapshot index otherwise we may have an
26	// inconsistent snapshot.
27	cases := []struct {
28		name              string
29		snapshotSize      int
30		updatesBeforeSnap int
31		updatesAfterSnap  int
32	}{
33		{
34			name:              "snapshot with subsequent mutations",
35			snapshotSize:      10,
36			updatesBeforeSnap: 0,
37			updatesAfterSnap:  10,
38		},
39		{
40			name:              "snapshot with concurrent mutations",
41			snapshotSize:      10,
42			updatesBeforeSnap: 5,
43			updatesAfterSnap:  5,
44		},
45		{
46			name:              "empty snapshot with subsequent mutations",
47			snapshotSize:      0,
48			updatesBeforeSnap: 0,
49			updatesAfterSnap:  10,
50		},
51		{
52			name:              "empty snapshot with concurrent mutations",
53			snapshotSize:      0,
54			updatesBeforeSnap: 5,
55			updatesAfterSnap:  5,
56		},
57	}
58
59	snapIndex := uint64(1000)
60
61	for _, tc := range cases {
62		tc := tc
63		t.Run(tc.name, func(t *testing.T) {
64			require.True(t, tc.updatesBeforeSnap < 999,
65				"bad test param updatesBeforeSnap must be less than the snapshot"+
66					" index (%d) minus one (%d), got: %d", snapIndex, snapIndex-1,
67				tc.updatesBeforeSnap)
68
69			// Create a snapshot func that will deliver registration events.
70			snFn := testHealthConsecutiveSnapshotFn(tc.snapshotSize, snapIndex)
71
72			// Create a topic buffer for updates
73			tb := newEventBuffer()
74
75			// Capture the topic buffer head now so updatesBeforeSnap are "concurrent"
76			// and are seen by the eventSnapshot once it completes the snap.
77			tbHead := tb.Head()
78
79			// Deliver any pre-snapshot events simulating updates that occur after the
80			// topic buffer is captured during a Subscribe call, but before the
81			// snapshot is made of the FSM.
82			for i := tc.updatesBeforeSnap; i > 0; i-- {
83				index := snapIndex - uint64(i)
84				// Use an instance index that's unique and should never appear in the
85				// output so we can be sure these were not included as they came before
86				// the snapshot.
87				tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)})
88			}
89
90			es := newEventSnapshot()
91			es.appendAndSplice(SubscribeRequest{}, snFn, tbHead)
92
93			// Deliver any post-snapshot events simulating updates that occur
94			// logically after snapshot. It doesn't matter that these might actually
95			// be appended before the snapshot fn executes in another goroutine since
96			// it's operating an a possible stale "snapshot". This is the same as
97			// reality with the state store where updates that occur after the
98			// snapshot is taken but while the SnapFnis still running must be captured
99			// correctly.
100			for i := 0; i < tc.updatesAfterSnap; i++ {
101				index := snapIndex + 1 + uint64(i)
102				// Use an instance index that's unique.
103				tb.Append([]Event{newDefaultHealthEvent(index, 20000+i)})
104			}
105
106			// Now read the snapshot buffer until we've received everything we expect.
107			// Don't wait too long in case we get stuck.
108			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
109			defer cancel()
110
111			snapIDs := make([]string, 0, tc.snapshotSize)
112			updateIDs := make([]string, 0, tc.updatesAfterSnap)
113			snapDone := false
114			curItem := es.First
115			var err error
116		RECV:
117			for {
118				curItem, err = curItem.Next(ctx, nil)
119				// This error is typically timeout so dump the state to aid debugging.
120				require.NoError(t, err,
121					"current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone,
122					snapIDs, updateIDs)
123				if len(curItem.Events) == 0 {
124					// An item without an error or events is a bufferItem.NextLink event.
125					// A subscription handles this by proceeding to the next item,
126					// so we do the same here.
127					continue
128				}
129				e := curItem.Events[0]
130				switch {
131				case snapDone:
132					payload, ok := e.Payload.(simplePayload)
133					require.True(t, ok, "want health event got: %#v", e.Payload)
134					updateIDs = append(updateIDs, payload.value)
135					if len(updateIDs) == tc.updatesAfterSnap {
136						// We're done!
137						break RECV
138					}
139				case e.IsEndOfSnapshot():
140					snapDone = true
141				default:
142					payload, ok := e.Payload.(simplePayload)
143					require.True(t, ok, "want health event got: %#v", e.Payload)
144					snapIDs = append(snapIDs, payload.value)
145				}
146			}
147
148			// Validate the event IDs we got delivered.
149			require.Equal(t, genSequentialIDs(0, tc.snapshotSize), snapIDs)
150			require.Equal(t, genSequentialIDs(20000, 20000+tc.updatesAfterSnap), updateIDs)
151		})
152	}
153}
154
155func genSequentialIDs(start, end int) []string {
156	ids := make([]string, 0, end-start)
157	for i := start; i < end; i++ {
158		ids = append(ids, fmt.Sprintf("test-event-%03d", i))
159	}
160	return ids
161}
162
163func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapshotFunc {
164	return func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
165		for i := 0; i < size; i++ {
166			// Event content is arbitrary we are just using Health because it's the
167			// first type defined. We just want a set of things with consecutive
168			// names.
169			buf.Append([]Event{newDefaultHealthEvent(index, i)})
170		}
171		return index, nil
172	}
173}
174
175func newDefaultHealthEvent(index uint64, n int) Event {
176	return Event{
177		Index:   index,
178		Topic:   testTopic,
179		Payload: simplePayload{value: fmt.Sprintf("test-event-%03d", n)},
180	}
181}
182