1package stream 2 3import ( 4 "context" 5 fmt "fmt" 6 "testing" 7 time "time" 8 9 "github.com/stretchr/testify/require" 10) 11 12func TestEventSnapshot(t *testing.T) { 13 // Setup a dummy state that we can manipulate easily. The properties we care 14 // about are that we publish some sequence of events as a snapshot and then 15 // follow them up with "live updates". We control the interleaving. Our state 16 // consists of health events (only type fully defined so far) for service 17 // instances with consecutive ID numbers starting from 0 (e.g. test-000, 18 // test-001). The snapshot is delivered at index 1000. updatesBeforeSnap 19 // controls how many updates are delivered _before_ the snapshot is complete 20 // (with an index < 1000). updatesBeforeSnap controls the number of updates 21 // delivered after (index > 1000). 22 // 23 // In all cases the invariant should be that we end up with all of the 24 // instances in the snapshot, plus any delivered _after_ the snapshot index, 25 // but none delivered _before_ the snapshot index otherwise we may have an 26 // inconsistent snapshot. 27 cases := []struct { 28 name string 29 snapshotSize int 30 updatesBeforeSnap int 31 updatesAfterSnap int 32 }{ 33 { 34 name: "snapshot with subsequent mutations", 35 snapshotSize: 10, 36 updatesBeforeSnap: 0, 37 updatesAfterSnap: 10, 38 }, 39 { 40 name: "snapshot with concurrent mutations", 41 snapshotSize: 10, 42 updatesBeforeSnap: 5, 43 updatesAfterSnap: 5, 44 }, 45 { 46 name: "empty snapshot with subsequent mutations", 47 snapshotSize: 0, 48 updatesBeforeSnap: 0, 49 updatesAfterSnap: 10, 50 }, 51 { 52 name: "empty snapshot with concurrent mutations", 53 snapshotSize: 0, 54 updatesBeforeSnap: 5, 55 updatesAfterSnap: 5, 56 }, 57 } 58 59 snapIndex := uint64(1000) 60 61 for _, tc := range cases { 62 tc := tc 63 t.Run(tc.name, func(t *testing.T) { 64 require.True(t, tc.updatesBeforeSnap < 999, 65 "bad test param updatesBeforeSnap must be less than the snapshot"+ 66 " index (%d) minus one (%d), got: %d", snapIndex, snapIndex-1, 67 tc.updatesBeforeSnap) 68 69 // Create a snapshot func that will deliver registration events. 70 snFn := testHealthConsecutiveSnapshotFn(tc.snapshotSize, snapIndex) 71 72 // Create a topic buffer for updates 73 tb := newEventBuffer() 74 75 // Capture the topic buffer head now so updatesBeforeSnap are "concurrent" 76 // and are seen by the eventSnapshot once it completes the snap. 77 tbHead := tb.Head() 78 79 // Deliver any pre-snapshot events simulating updates that occur after the 80 // topic buffer is captured during a Subscribe call, but before the 81 // snapshot is made of the FSM. 82 for i := tc.updatesBeforeSnap; i > 0; i-- { 83 index := snapIndex - uint64(i) 84 // Use an instance index that's unique and should never appear in the 85 // output so we can be sure these were not included as they came before 86 // the snapshot. 87 tb.Append([]Event{newDefaultHealthEvent(index, 10000+i)}) 88 } 89 90 es := newEventSnapshot() 91 es.appendAndSplice(SubscribeRequest{}, snFn, tbHead) 92 93 // Deliver any post-snapshot events simulating updates that occur 94 // logically after snapshot. It doesn't matter that these might actually 95 // be appended before the snapshot fn executes in another goroutine since 96 // it's operating an a possible stale "snapshot". This is the same as 97 // reality with the state store where updates that occur after the 98 // snapshot is taken but while the SnapFnis still running must be captured 99 // correctly. 100 for i := 0; i < tc.updatesAfterSnap; i++ { 101 index := snapIndex + 1 + uint64(i) 102 // Use an instance index that's unique. 103 tb.Append([]Event{newDefaultHealthEvent(index, 20000+i)}) 104 } 105 106 // Now read the snapshot buffer until we've received everything we expect. 107 // Don't wait too long in case we get stuck. 108 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 109 defer cancel() 110 111 snapIDs := make([]string, 0, tc.snapshotSize) 112 updateIDs := make([]string, 0, tc.updatesAfterSnap) 113 snapDone := false 114 curItem := es.First 115 var err error 116 RECV: 117 for { 118 curItem, err = curItem.Next(ctx, nil) 119 // This error is typically timeout so dump the state to aid debugging. 120 require.NoError(t, err, 121 "current state: snapDone=%v snapIDs=%s updateIDs=%s", snapDone, 122 snapIDs, updateIDs) 123 if len(curItem.Events) == 0 { 124 // An item without an error or events is a bufferItem.NextLink event. 125 // A subscription handles this by proceeding to the next item, 126 // so we do the same here. 127 continue 128 } 129 e := curItem.Events[0] 130 switch { 131 case snapDone: 132 payload, ok := e.Payload.(simplePayload) 133 require.True(t, ok, "want health event got: %#v", e.Payload) 134 updateIDs = append(updateIDs, payload.value) 135 if len(updateIDs) == tc.updatesAfterSnap { 136 // We're done! 137 break RECV 138 } 139 case e.IsEndOfSnapshot(): 140 snapDone = true 141 default: 142 payload, ok := e.Payload.(simplePayload) 143 require.True(t, ok, "want health event got: %#v", e.Payload) 144 snapIDs = append(snapIDs, payload.value) 145 } 146 } 147 148 // Validate the event IDs we got delivered. 149 require.Equal(t, genSequentialIDs(0, tc.snapshotSize), snapIDs) 150 require.Equal(t, genSequentialIDs(20000, 20000+tc.updatesAfterSnap), updateIDs) 151 }) 152 } 153} 154 155func genSequentialIDs(start, end int) []string { 156 ids := make([]string, 0, end-start) 157 for i := start; i < end; i++ { 158 ids = append(ids, fmt.Sprintf("test-event-%03d", i)) 159 } 160 return ids 161} 162 163func testHealthConsecutiveSnapshotFn(size int, index uint64) SnapshotFunc { 164 return func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { 165 for i := 0; i < size; i++ { 166 // Event content is arbitrary we are just using Health because it's the 167 // first type defined. We just want a set of things with consecutive 168 // names. 169 buf.Append([]Event{newDefaultHealthEvent(index, i)}) 170 } 171 return index, nil 172 } 173} 174 175func newDefaultHealthEvent(index uint64, n int) Event { 176 return Event{ 177 Index: index, 178 Topic: testTopic, 179 Payload: simplePayload{value: fmt.Sprintf("test-event-%03d", n)}, 180 } 181} 182