1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"container/list"
18	"fmt"
19	"sync"
20)
21
22// AckConsumer is the interface exported from this package for acking messages.
23type AckConsumer interface {
24	Ack()
25}
26
27// ackedFunc is invoked when a message has been acked by the user. Note: if the
28// ackedFunc implementation calls any ackConsumer methods, it needs to run in a
29// goroutine to avoid a deadlock.
30type ackedFunc func(*ackConsumer)
31
32// ackConsumer is used for handling message acks. It is attached to a Message
33// and also stored within the ackTracker until the message has been acked by the
34// user.
35type ackConsumer struct {
36	// The message offset.
37	Offset int64
38	// Bytes released to the flow controller once the message has been acked.
39	MsgBytes int64
40
41	// Guards access to fields below.
42	mu    sync.Mutex
43	acked bool
44	onAck ackedFunc
45}
46
47func newAckConsumer(offset, msgBytes int64, onAck ackedFunc) *ackConsumer {
48	return &ackConsumer{Offset: offset, MsgBytes: msgBytes, onAck: onAck}
49}
50
51func (ac *ackConsumer) Ack() {
52	ac.mu.Lock()
53	defer ac.mu.Unlock()
54
55	if ac.acked {
56		return
57	}
58	ac.acked = true
59	if ac.onAck != nil {
60		// Not invoked in a goroutine here for ease of testing.
61		ac.onAck(ac)
62	}
63}
64
65func (ac *ackConsumer) IsAcked() bool {
66	ac.mu.Lock()
67	defer ac.mu.Unlock()
68	return ac.acked
69}
70
71// Clear onAck when the ack can no longer be processed. The user's ack would be
72// ignored.
73func (ac *ackConsumer) Clear() {
74	ac.mu.Lock()
75	defer ac.mu.Unlock()
76	ac.onAck = nil
77}
78
79// Represents an uninitialized cursor offset. A sentinel value is used instead
80// of an optional to simplify cursor comparisons (i.e. -1 works without the need
81// to check for nil and then convert to int64).
82const nilCursorOffset int64 = -1
83
84// ackTracker manages outstanding message acks, i.e. messages that have been
85// delivered to the user, but not yet acked. It is used by the committer and
86// subscribeStream, so requires its own mutex.
87type ackTracker struct {
88	// Guards access to fields below.
89	mu sync.Mutex
90	// All offsets before and including this prefix have been acked by the user.
91	ackedPrefixOffset int64
92	// Outstanding message acks, strictly ordered by increasing message offsets.
93	outstandingAcks *list.List // Value = *ackConsumer
94	// Whether new acks can be pushed.
95	enablePush bool
96}
97
98func newAckTracker() *ackTracker {
99	return &ackTracker{
100		ackedPrefixOffset: nilCursorOffset,
101		outstandingAcks:   list.New(),
102		enablePush:        true,
103	}
104}
105
106// Push adds an outstanding ack to the tracker.
107func (at *ackTracker) Push(ack *ackConsumer) error {
108	at.mu.Lock()
109	defer at.mu.Unlock()
110
111	if !at.enablePush {
112		ack.Clear()
113		return nil
114	}
115
116	// These errors should not occur unless there is a bug in the client library
117	// as message ordering should have been validated by subscriberOffsetTracker.
118	if ack.Offset <= at.ackedPrefixOffset {
119		return errOutOfOrderMessages
120	}
121	if elem := at.outstandingAcks.Back(); elem != nil {
122		lastOutstandingAck, _ := elem.Value.(*ackConsumer)
123		if ack.Offset <= lastOutstandingAck.Offset {
124			return errOutOfOrderMessages
125		}
126	}
127
128	at.outstandingAcks.PushBack(ack)
129	return nil
130}
131
132// CommitOffset returns the cursor offset that should be committed. May return
133// nilCursorOffset if no messages have been acked thus far.
134func (at *ackTracker) CommitOffset() int64 {
135	at.mu.Lock()
136	defer at.mu.Unlock()
137
138	at.unsafeProcessAcks()
139
140	if at.ackedPrefixOffset == nilCursorOffset {
141		return nilCursorOffset
142	}
143	// Convert from last acked to first unacked, which is the commit offset.
144	return at.ackedPrefixOffset + 1
145}
146
147// Release clears and invalidates any outstanding acks. Push will clear and
148// discard new acks. This should be called when the committer terminates.
149func (at *ackTracker) Release() {
150	at.mu.Lock()
151	defer at.mu.Unlock()
152
153	at.enablePush = false
154	at.unsafeProcessAcks()
155	at.unsafeClearAcks()
156}
157
158// Reset the state of the tracker. Clears and invalidates any outstanding acks.
159func (at *ackTracker) Reset() {
160	at.mu.Lock()
161	defer at.mu.Unlock()
162
163	at.unsafeClearAcks()
164	at.ackedPrefixOffset = nilCursorOffset
165	at.enablePush = true
166}
167
168// Clears and invalidates any outstanding acks.
169func (at *ackTracker) unsafeClearAcks() {
170	for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() {
171		ack, _ := elem.Value.(*ackConsumer)
172		ack.Clear()
173	}
174	at.outstandingAcks.Init()
175}
176
177// Process outstanding acks and update `ackedPrefixOffset` until an unacked
178// message is found.
179func (at *ackTracker) unsafeProcessAcks() {
180	for {
181		elem := at.outstandingAcks.Front()
182		if elem == nil {
183			break
184		}
185		ack, _ := elem.Value.(*ackConsumer)
186		if !ack.IsAcked() {
187			break
188		}
189		at.ackedPrefixOffset = ack.Offset
190		at.outstandingAcks.Remove(elem)
191		ack.Clear()
192	}
193}
194
195// Empty returns true if there are no outstanding acks.
196func (at *ackTracker) Empty() bool {
197	at.mu.Lock()
198	defer at.mu.Unlock()
199	return at.outstandingAcks.Front() == nil
200}
201
202// commitCursorTracker tracks pending and last successful committed offsets.
203// It is only accessed by the committer.
204type commitCursorTracker struct {
205	// Used to obtain the desired commit offset based on messages acked by the
206	// user.
207	acks *ackTracker
208	// Last offset for which the server confirmed (acknowledged) the commit.
209	lastConfirmedOffset int64
210	// Queue of committed offsets awaiting confirmation from the server.
211	pendingOffsets *list.List // Value = int64
212}
213
214func newCommitCursorTracker(acks *ackTracker) *commitCursorTracker {
215	return &commitCursorTracker{
216		acks:                acks,
217		lastConfirmedOffset: nilCursorOffset,
218		pendingOffsets:      list.New(),
219	}
220}
221
222func extractOffsetFromElem(elem *list.Element) int64 {
223	if elem == nil {
224		return nilCursorOffset
225	}
226	offset, _ := elem.Value.(int64)
227	return offset
228}
229
230// Reset the state of the tracker.
231func (ct *commitCursorTracker) Reset() {
232	ct.acks.Reset()
233	ct.lastConfirmedOffset = nilCursorOffset
234	ct.pendingOffsets.Init()
235}
236
237// NextOffset is the commit offset to be sent to the stream. Returns
238// nilCursorOffset if the commit offset does not need to be updated.
239func (ct *commitCursorTracker) NextOffset() int64 {
240	desiredCommitOffset := ct.acks.CommitOffset()
241	if desiredCommitOffset <= ct.lastConfirmedOffset {
242		// The server has already confirmed the commit offset.
243		return nilCursorOffset
244	}
245	if desiredCommitOffset <= extractOffsetFromElem(ct.pendingOffsets.Back()) {
246		// The commit offset has already been sent to the commit stream and is
247		// awaiting confirmation.
248		return nilCursorOffset
249	}
250	return desiredCommitOffset
251}
252
253// AddPending adds a sent, but not yet confirmed, committed offset.
254func (ct *commitCursorTracker) AddPending(offset int64) {
255	ct.pendingOffsets.PushBack(offset)
256}
257
258// ClearPending discards old pending offsets. Should be called when the commit
259// stream reconnects, as the server acknowledgments for these would not be
260// received.
261func (ct *commitCursorTracker) ClearPending() {
262	ct.pendingOffsets.Init()
263}
264
265// ConfirmOffsets processes the server's acknowledgment of the first
266// `numConfirmed` pending offsets.
267func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error {
268	if numPending := int64(ct.pendingOffsets.Len()); numPending < numConfirmed {
269		return fmt.Errorf("pubsublite: server acknowledged %d cursor commits, but only %d were sent", numConfirmed, numPending)
270	}
271
272	for i := int64(0); i < numConfirmed; i++ {
273		front := ct.pendingOffsets.Front()
274		ct.lastConfirmedOffset = extractOffsetFromElem(front)
275		ct.pendingOffsets.Remove(front)
276	}
277	return nil
278}
279
280// UpToDate when the server has confirmed the desired commit offset and there
281// are no pending acks.
282func (ct *commitCursorTracker) UpToDate() bool {
283	return ct.acks.CommitOffset() <= ct.lastConfirmedOffset && ct.acks.Empty()
284}
285