1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import "testing"
17
18func emptyAckConsumer(_ *ackConsumer) {
19	// Nothing to do.
20}
21
22func TestAckConsumerAck(t *testing.T) {
23	numAcks := 0
24	onAck := func(ac *ackConsumer) {
25		numAcks++
26	}
27	ackConsumer := newAckConsumer(0, 0, onAck)
28	if got, want := ackConsumer.IsAcked(), false; got != want {
29		t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want)
30	}
31
32	// Test duplicate acks.
33	for i := 0; i < 3; i++ {
34		ackConsumer.Ack()
35
36		if got, want := ackConsumer.IsAcked(), true; got != want {
37			t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want)
38		}
39		if got, want := numAcks, 1; got != want {
40			t.Errorf("onAck func called %v times, expected %v call", got, want)
41		}
42	}
43}
44
45func TestAckConsumerClear(t *testing.T) {
46	onAck := func(ac *ackConsumer) {
47		t.Error("onAck func should not have been called")
48	}
49	ackConsumer := newAckConsumer(0, 0, onAck)
50	ackConsumer.Clear()
51	ackConsumer.Ack()
52
53	if got, want := ackConsumer.IsAcked(), true; got != want {
54		t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want)
55	}
56}
57
58func TestAckTrackerProcessing(t *testing.T) {
59	ackTracker := newAckTracker()
60
61	// No messages received yet.
62	if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want {
63		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
64	}
65
66	ack1 := newAckConsumer(1, 0, emptyAckConsumer)
67	ack2 := newAckConsumer(2, 0, emptyAckConsumer)
68	ack3 := newAckConsumer(3, 0, emptyAckConsumer)
69	if err := ackTracker.Push(ack1); err != nil {
70		t.Errorf("ackTracker.Push() got err %v", err)
71	}
72	if err := ackTracker.Push(ack2); err != nil {
73		t.Errorf("ackTracker.Push() got err %v", err)
74	}
75	if err := ackTracker.Push(ack3); err != nil {
76		t.Errorf("ackTracker.Push() got err %v", err)
77	}
78
79	// All messages unacked.
80	if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want {
81		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
82	}
83
84	ack1.Ack()
85	if got, want := ackTracker.CommitOffset(), int64(2); got != want {
86		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
87	}
88
89	// Skipped ack2, so the commit offset should not have been updated.
90	ack3.Ack()
91	if got, want := ackTracker.CommitOffset(), int64(2); got != want {
92		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
93	}
94
95	// Both ack2 and ack3 should be removed from the outstanding acks queue.
96	ack2.Ack()
97	if got, want := ackTracker.CommitOffset(), int64(4); got != want {
98		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
99	}
100
101	// Newly received message.
102	ack4 := newAckConsumer(4, 0, emptyAckConsumer)
103	if err := ackTracker.Push(ack4); err != nil {
104		t.Errorf("ackTracker.Push() got err %v", err)
105	}
106	ack4.Ack()
107	if got, want := ackTracker.CommitOffset(), int64(5); got != want {
108		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
109	}
110}
111
112func TestAckTrackerRelease(t *testing.T) {
113	ackTracker := newAckTracker()
114	onAckAfterRelease := func(ac *ackConsumer) {
115		t.Error("onAck should not be called")
116	}
117	ack1 := newAckConsumer(1, 0, emptyAckConsumer)
118	ack2 := newAckConsumer(2, 0, onAckAfterRelease)
119	ack3 := newAckConsumer(3, 0, onAckAfterRelease)
120
121	if err := ackTracker.Push(ack1); err != nil {
122		t.Errorf("ackTracker.Push() got err %v", err)
123	}
124	if err := ackTracker.Push(ack2); err != nil {
125		t.Errorf("ackTracker.Push() got err %v", err)
126	}
127	if err := ackTracker.Push(ack3); err != nil {
128		t.Errorf("ackTracker.Push() got err %v", err)
129	}
130
131	// First ack is called before Release and should be processed.
132	ack1.Ack()
133
134	// After clearing outstanding acks, onAck should not be called.
135	ackTracker.Release()
136	ack2.Ack()
137	ack3.Ack()
138
139	if got, want := ackTracker.CommitOffset(), int64(2); got != want {
140		t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
141	}
142}
143
144func TestCommitCursorTrackerProcessing(t *testing.T) {
145	ackTracker := newAckTracker()
146	commitTracker := newCommitCursorTracker(ackTracker)
147
148	// No messages received yet.
149	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
150		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
151	}
152
153	ack1 := newAckConsumer(1, 0, emptyAckConsumer)
154	ack2 := newAckConsumer(2, 0, emptyAckConsumer)
155	ack3 := newAckConsumer(3, 0, emptyAckConsumer)
156	if err := ackTracker.Push(ack1); err != nil {
157		t.Errorf("ackTracker.Push() got err %v", err)
158	}
159	if err := ackTracker.Push(ack2); err != nil {
160		t.Errorf("ackTracker.Push() got err %v", err)
161	}
162	if err := ackTracker.Push(ack3); err != nil {
163		t.Errorf("ackTracker.Push() got err %v", err)
164	}
165
166	// All messages unacked.
167	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
168		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
169	}
170
171	// Msg1 acked and commit sent to stream.
172	ack1.Ack()
173	if got, want := commitTracker.NextOffset(), int64(2); got != want {
174		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
175	}
176	commitTracker.AddPending(commitTracker.NextOffset())
177	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
178		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
179	}
180
181	// Msg 2 & 3 acked commit and sent to stream.
182	ack2.Ack()
183	if got, want := commitTracker.NextOffset(), int64(3); got != want {
184		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
185	}
186	ack3.Ack()
187	if got, want := commitTracker.NextOffset(), int64(4); got != want {
188		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
189	}
190	commitTracker.AddPending(commitTracker.NextOffset())
191	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
192		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
193	}
194	if got, want := commitTracker.UpToDate(), false; got != want {
195		t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
196	}
197
198	// First 2 pending commits acknowledged.
199	if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want {
200		t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
201	}
202	if err := commitTracker.ConfirmOffsets(2); err != nil {
203		t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
204	}
205	if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want {
206		t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
207	}
208	if got, want := commitTracker.UpToDate(), true; got != want {
209		t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
210	}
211}
212
213func TestCommitCursorTrackerStreamReconnects(t *testing.T) {
214	ackTracker := newAckTracker()
215	commitTracker := newCommitCursorTracker(ackTracker)
216
217	ack1 := newAckConsumer(1, 0, emptyAckConsumer)
218	ack2 := newAckConsumer(2, 0, emptyAckConsumer)
219	ack3 := newAckConsumer(3, 0, emptyAckConsumer)
220	if err := ackTracker.Push(ack1); err != nil {
221		t.Errorf("ackTracker.Push() got err %v", err)
222	}
223	if err := ackTracker.Push(ack2); err != nil {
224		t.Errorf("ackTracker.Push() got err %v", err)
225	}
226	if err := ackTracker.Push(ack3); err != nil {
227		t.Errorf("ackTracker.Push() got err %v", err)
228	}
229
230	// All messages unacked.
231	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
232		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
233	}
234
235	// Msg1 acked and commit sent to stream.
236	ack1.Ack()
237	if got, want := commitTracker.NextOffset(), int64(2); got != want {
238		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
239	}
240	commitTracker.AddPending(commitTracker.NextOffset())
241	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
242		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
243	}
244
245	// Msg2 acked and commit sent to stream.
246	ack2.Ack()
247	if got, want := commitTracker.NextOffset(), int64(3); got != want {
248		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
249	}
250	commitTracker.AddPending(commitTracker.NextOffset())
251	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
252		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
253	}
254
255	// Stream breaks and pending offsets are cleared.
256	commitTracker.ClearPending()
257	if got, want := commitTracker.UpToDate(), false; got != want {
258		t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
259	}
260	// When the stream reconnects the next offset should be 3 (offset 2 skipped).
261	if got, want := commitTracker.NextOffset(), int64(3); got != want {
262		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
263	}
264	commitTracker.AddPending(commitTracker.NextOffset())
265	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
266		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
267	}
268
269	// Msg2 acked and commit sent to stream.
270	ack3.Ack()
271	if got, want := commitTracker.NextOffset(), int64(4); got != want {
272		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
273	}
274	commitTracker.AddPending(commitTracker.NextOffset())
275	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
276		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
277	}
278
279	// Only 1 pending commit confirmed.
280	if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want {
281		t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
282	}
283	if err := commitTracker.ConfirmOffsets(1); err != nil {
284		t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
285	}
286	if got, want := commitTracker.lastConfirmedOffset, int64(3); got != want {
287		t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
288	}
289	if got, want := commitTracker.UpToDate(), false; got != want {
290		t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
291	}
292
293	// Final pending commit confirmed.
294	if err := commitTracker.ConfirmOffsets(1); err != nil {
295		t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
296	}
297	if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want {
298		t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
299	}
300	if got, want := commitTracker.UpToDate(), true; got != want {
301		t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
302	}
303
304	// Note: UpToDate() returns true even though there are unacked messages.
305	ack4 := newAckConsumer(4, 0, emptyAckConsumer)
306	if err := ackTracker.Push(ack4); err != nil {
307		t.Errorf("ackTracker.Push() got err %v", err)
308	}
309	if got, want := commitTracker.UpToDate(), true; got != want {
310		t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
311	}
312	if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
313		t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
314	}
315}
316