1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import "testing" 17 18func emptyAckConsumer(_ *ackConsumer) { 19 // Nothing to do. 20} 21 22func TestAckConsumerAck(t *testing.T) { 23 numAcks := 0 24 onAck := func(ac *ackConsumer) { 25 numAcks++ 26 } 27 ackConsumer := newAckConsumer(0, 0, onAck) 28 if got, want := ackConsumer.IsAcked(), false; got != want { 29 t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want) 30 } 31 32 // Test duplicate acks. 33 for i := 0; i < 3; i++ { 34 ackConsumer.Ack() 35 36 if got, want := ackConsumer.IsAcked(), true; got != want { 37 t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want) 38 } 39 if got, want := numAcks, 1; got != want { 40 t.Errorf("onAck func called %v times, expected %v call", got, want) 41 } 42 } 43} 44 45func TestAckConsumerClear(t *testing.T) { 46 onAck := func(ac *ackConsumer) { 47 t.Error("onAck func should not have been called") 48 } 49 ackConsumer := newAckConsumer(0, 0, onAck) 50 ackConsumer.Clear() 51 ackConsumer.Ack() 52 53 if got, want := ackConsumer.IsAcked(), true; got != want { 54 t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want) 55 } 56} 57 58func TestAckTrackerProcessing(t *testing.T) { 59 ackTracker := newAckTracker() 60 61 // No messages received yet. 62 if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want { 63 t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) 64 } 65 66 ack1 := newAckConsumer(1, 0, emptyAckConsumer) 67 ack2 := newAckConsumer(2, 0, emptyAckConsumer) 68 ack3 := newAckConsumer(3, 0, emptyAckConsumer) 69 if err := ackTracker.Push(ack1); err != nil { 70 t.Errorf("ackTracker.Push() got err %v", err) 71 } 72 if err := ackTracker.Push(ack2); err != nil { 73 t.Errorf("ackTracker.Push() got err %v", err) 74 } 75 if err := ackTracker.Push(ack3); err != nil { 76 t.Errorf("ackTracker.Push() got err %v", err) 77 } 78 79 // All messages unacked. 80 if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want { 81 t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) 82 } 83 84 ack1.Ack() 85 if got, want := ackTracker.CommitOffset(), int64(2); got != want { 86 t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) 87 } 88 89 // Skipped ack2, so the commit offset should not have been updated. 90 ack3.Ack() 91 if got, want := ackTracker.CommitOffset(), int64(2); got != want { 92 t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) 93 } 94 95 // Both ack2 and ack3 should be removed from the outstanding acks queue. 96 ack2.Ack() 97 if got, want := ackTracker.CommitOffset(), int64(4); got != want { 98 t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) 99 } 100 101 // Newly received message. 102 ack4 := newAckConsumer(4, 0, emptyAckConsumer) 103 if err := ackTracker.Push(ack4); err != nil { 104 t.Errorf("ackTracker.Push() got err %v", err) 105 } 106 ack4.Ack() 107 if got, want := ackTracker.CommitOffset(), int64(5); got != want { 108 t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) 109 } 110} 111 112func TestAckTrackerRelease(t *testing.T) { 113 ackTracker := newAckTracker() 114 onAckAfterRelease := func(ac *ackConsumer) { 115 t.Error("onAck should not be called") 116 } 117 ack1 := newAckConsumer(1, 0, emptyAckConsumer) 118 ack2 := newAckConsumer(2, 0, onAckAfterRelease) 119 ack3 := newAckConsumer(3, 0, onAckAfterRelease) 120 121 if err := ackTracker.Push(ack1); err != nil { 122 t.Errorf("ackTracker.Push() got err %v", err) 123 } 124 if err := ackTracker.Push(ack2); err != nil { 125 t.Errorf("ackTracker.Push() got err %v", err) 126 } 127 if err := ackTracker.Push(ack3); err != nil { 128 t.Errorf("ackTracker.Push() got err %v", err) 129 } 130 131 // First ack is called before Release and should be processed. 132 ack1.Ack() 133 134 // After clearing outstanding acks, onAck should not be called. 135 ackTracker.Release() 136 ack2.Ack() 137 ack3.Ack() 138 139 if got, want := ackTracker.CommitOffset(), int64(2); got != want { 140 t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want) 141 } 142} 143 144func TestCommitCursorTrackerProcessing(t *testing.T) { 145 ackTracker := newAckTracker() 146 commitTracker := newCommitCursorTracker(ackTracker) 147 148 // No messages received yet. 149 if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { 150 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 151 } 152 153 ack1 := newAckConsumer(1, 0, emptyAckConsumer) 154 ack2 := newAckConsumer(2, 0, emptyAckConsumer) 155 ack3 := newAckConsumer(3, 0, emptyAckConsumer) 156 if err := ackTracker.Push(ack1); err != nil { 157 t.Errorf("ackTracker.Push() got err %v", err) 158 } 159 if err := ackTracker.Push(ack2); err != nil { 160 t.Errorf("ackTracker.Push() got err %v", err) 161 } 162 if err := ackTracker.Push(ack3); err != nil { 163 t.Errorf("ackTracker.Push() got err %v", err) 164 } 165 166 // All messages unacked. 167 if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { 168 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 169 } 170 171 // Msg1 acked and commit sent to stream. 172 ack1.Ack() 173 if got, want := commitTracker.NextOffset(), int64(2); got != want { 174 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 175 } 176 commitTracker.AddPending(commitTracker.NextOffset()) 177 if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { 178 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 179 } 180 181 // Msg 2 & 3 acked commit and sent to stream. 182 ack2.Ack() 183 if got, want := commitTracker.NextOffset(), int64(3); got != want { 184 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 185 } 186 ack3.Ack() 187 if got, want := commitTracker.NextOffset(), int64(4); got != want { 188 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 189 } 190 commitTracker.AddPending(commitTracker.NextOffset()) 191 if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { 192 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 193 } 194 if got, want := commitTracker.UpToDate(), false; got != want { 195 t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) 196 } 197 198 // First 2 pending commits acknowledged. 199 if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want { 200 t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want) 201 } 202 if err := commitTracker.ConfirmOffsets(2); err != nil { 203 t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err) 204 } 205 if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want { 206 t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want) 207 } 208 if got, want := commitTracker.UpToDate(), true; got != want { 209 t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) 210 } 211} 212 213func TestCommitCursorTrackerStreamReconnects(t *testing.T) { 214 ackTracker := newAckTracker() 215 commitTracker := newCommitCursorTracker(ackTracker) 216 217 ack1 := newAckConsumer(1, 0, emptyAckConsumer) 218 ack2 := newAckConsumer(2, 0, emptyAckConsumer) 219 ack3 := newAckConsumer(3, 0, emptyAckConsumer) 220 if err := ackTracker.Push(ack1); err != nil { 221 t.Errorf("ackTracker.Push() got err %v", err) 222 } 223 if err := ackTracker.Push(ack2); err != nil { 224 t.Errorf("ackTracker.Push() got err %v", err) 225 } 226 if err := ackTracker.Push(ack3); err != nil { 227 t.Errorf("ackTracker.Push() got err %v", err) 228 } 229 230 // All messages unacked. 231 if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { 232 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 233 } 234 235 // Msg1 acked and commit sent to stream. 236 ack1.Ack() 237 if got, want := commitTracker.NextOffset(), int64(2); got != want { 238 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 239 } 240 commitTracker.AddPending(commitTracker.NextOffset()) 241 if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { 242 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 243 } 244 245 // Msg2 acked and commit sent to stream. 246 ack2.Ack() 247 if got, want := commitTracker.NextOffset(), int64(3); got != want { 248 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 249 } 250 commitTracker.AddPending(commitTracker.NextOffset()) 251 if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { 252 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 253 } 254 255 // Stream breaks and pending offsets are cleared. 256 commitTracker.ClearPending() 257 if got, want := commitTracker.UpToDate(), false; got != want { 258 t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) 259 } 260 // When the stream reconnects the next offset should be 3 (offset 2 skipped). 261 if got, want := commitTracker.NextOffset(), int64(3); got != want { 262 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 263 } 264 commitTracker.AddPending(commitTracker.NextOffset()) 265 if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { 266 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 267 } 268 269 // Msg2 acked and commit sent to stream. 270 ack3.Ack() 271 if got, want := commitTracker.NextOffset(), int64(4); got != want { 272 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 273 } 274 commitTracker.AddPending(commitTracker.NextOffset()) 275 if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { 276 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 277 } 278 279 // Only 1 pending commit confirmed. 280 if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want { 281 t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want) 282 } 283 if err := commitTracker.ConfirmOffsets(1); err != nil { 284 t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err) 285 } 286 if got, want := commitTracker.lastConfirmedOffset, int64(3); got != want { 287 t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want) 288 } 289 if got, want := commitTracker.UpToDate(), false; got != want { 290 t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) 291 } 292 293 // Final pending commit confirmed. 294 if err := commitTracker.ConfirmOffsets(1); err != nil { 295 t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err) 296 } 297 if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want { 298 t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want) 299 } 300 if got, want := commitTracker.UpToDate(), true; got != want { 301 t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) 302 } 303 304 // Note: UpToDate() returns true even though there are unacked messages. 305 ack4 := newAckConsumer(4, 0, emptyAckConsumer) 306 if err := ackTracker.Push(ack4); err != nil { 307 t.Errorf("ackTracker.Push() got err %v", err) 308 } 309 if got, want := commitTracker.UpToDate(), true; got != want { 310 t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want) 311 } 312 if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want { 313 t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want) 314 } 315} 316