1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"container/list"
18	"fmt"
19	"sync"
20)
21
22// AckConsumer is the interface exported from this package for acking messages.
23type AckConsumer interface {
24	Ack()
25}
26
27// ackedFunc is invoked when a message has been acked by the user. Note: if the
28// ackedFunc implementation calls any ackConsumer methods, it needs to run in a
29// goroutine to avoid a deadlock.
30type ackedFunc func(*ackConsumer)
31
32// ackConsumer is used for handling message acks. It is attached to a Message
33// and also stored within the ackTracker until the message has been acked by the
34// user.
35type ackConsumer struct {
36	// The message offset.
37	Offset int64
38	// Bytes released to the flow controller once the message has been acked.
39	MsgBytes int64
40
41	// Guards access to fields below.
42	mu    sync.Mutex
43	acked bool
44	onAck ackedFunc
45}
46
47func newAckConsumer(offset, msgBytes int64, onAck ackedFunc) *ackConsumer {
48	return &ackConsumer{Offset: offset, MsgBytes: msgBytes, onAck: onAck}
49}
50
51func (ac *ackConsumer) Ack() {
52	ac.mu.Lock()
53	defer ac.mu.Unlock()
54
55	if ac.acked {
56		return
57	}
58	ac.acked = true
59	if ac.onAck != nil {
60		// Not invoked in a goroutine here for ease of testing.
61		ac.onAck(ac)
62	}
63}
64
65func (ac *ackConsumer) IsAcked() bool {
66	ac.mu.Lock()
67	defer ac.mu.Unlock()
68	return ac.acked
69}
70
71// Clear onAck when the ack can no longer be processed. The user's ack would be
72// ignored.
73func (ac *ackConsumer) Clear() {
74	ac.mu.Lock()
75	defer ac.mu.Unlock()
76	ac.onAck = nil
77}
78
79// Represents an uninitialized cursor offset. A sentinel value is used instead
80// of an optional to simplify cursor comparisons (i.e. -1 works without the need
81// to check for nil and then convert to int64).
82const nilCursorOffset int64 = -1
83
84// ackTracker manages outstanding message acks, i.e. messages that have been
85// delivered to the user, but not yet acked. It is used by the committer and
86// subscribeStream, so requires its own mutex.
87type ackTracker struct {
88	// Guards access to fields below.
89	mu sync.Mutex
90	// All offsets before and including this prefix have been acked by the user.
91	ackedPrefixOffset int64
92	// Outstanding message acks, strictly ordered by increasing message offsets.
93	outstandingAcks *list.List // Value = *ackConsumer
94	// Whether new acks can be pushed.
95	enablePush bool
96}
97
98func newAckTracker() *ackTracker {
99	return &ackTracker{
100		ackedPrefixOffset: nilCursorOffset,
101		outstandingAcks:   list.New(),
102		enablePush:        true,
103	}
104}
105
106// Push adds an outstanding ack to the tracker.
107func (at *ackTracker) Push(ack *ackConsumer) error {
108	at.mu.Lock()
109	defer at.mu.Unlock()
110
111	if !at.enablePush {
112		ack.Clear()
113		return nil
114	}
115
116	// These errors should not occur unless there is a bug in the client library
117	// as message ordering should have been validated by subscriberOffsetTracker.
118	if ack.Offset <= at.ackedPrefixOffset {
119		return errOutOfOrderMessages
120	}
121	if elem := at.outstandingAcks.Back(); elem != nil {
122		lastOutstandingAck, _ := elem.Value.(*ackConsumer)
123		if ack.Offset <= lastOutstandingAck.Offset {
124			return errOutOfOrderMessages
125		}
126	}
127
128	at.outstandingAcks.PushBack(ack)
129	return nil
130}
131
132// CommitOffset returns the cursor offset that should be committed. May return
133// nilCursorOffset if no messages have been acked thus far.
134func (at *ackTracker) CommitOffset() int64 {
135	at.mu.Lock()
136	defer at.mu.Unlock()
137
138	at.unsafeProcessAcks()
139
140	if at.ackedPrefixOffset == nilCursorOffset {
141		return nilCursorOffset
142	}
143	// Convert from last acked to first unacked, which is the commit offset.
144	return at.ackedPrefixOffset + 1
145}
146
147// Release clears and invalidates any outstanding acks. Push will clear and
148// discard new acks. This should be called when the committer terminates.
149func (at *ackTracker) Release() {
150	at.mu.Lock()
151	defer at.mu.Unlock()
152
153	at.enablePush = false
154	at.unsafeProcessAcks()
155
156	for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() {
157		ack, _ := elem.Value.(*ackConsumer)
158		ack.Clear()
159	}
160	at.outstandingAcks.Init()
161}
162
163// Process outstanding acks and update `ackedPrefixOffset` until an unacked
164// message is found.
165func (at *ackTracker) unsafeProcessAcks() {
166	for {
167		elem := at.outstandingAcks.Front()
168		if elem == nil {
169			break
170		}
171		ack, _ := elem.Value.(*ackConsumer)
172		if !ack.IsAcked() {
173			break
174		}
175		at.ackedPrefixOffset = ack.Offset
176		at.outstandingAcks.Remove(elem)
177		ack.Clear()
178	}
179}
180
181// Empty returns true if there are no outstanding acks.
182func (at *ackTracker) Empty() bool {
183	at.mu.Lock()
184	defer at.mu.Unlock()
185	return at.outstandingAcks.Front() == nil
186}
187
188// commitCursorTracker tracks pending and last successful committed offsets.
189// It is only accessed by the committer.
190type commitCursorTracker struct {
191	// Used to obtain the desired commit offset based on messages acked by the
192	// user.
193	acks *ackTracker
194	// Last offset for which the server confirmed (acknowledged) the commit.
195	lastConfirmedOffset int64
196	// Queue of committed offsets awaiting confirmation from the server.
197	pendingOffsets *list.List // Value = int64
198}
199
200func newCommitCursorTracker(acks *ackTracker) *commitCursorTracker {
201	return &commitCursorTracker{
202		acks:                acks,
203		lastConfirmedOffset: nilCursorOffset,
204		pendingOffsets:      list.New(),
205	}
206}
207
208func extractOffsetFromElem(elem *list.Element) int64 {
209	if elem == nil {
210		return nilCursorOffset
211	}
212	offset, _ := elem.Value.(int64)
213	return offset
214}
215
216// NextOffset is the commit offset to be sent to the stream. Returns
217// nilCursorOffset if the commit offset does not need to be updated.
218func (ct *commitCursorTracker) NextOffset() int64 {
219	desiredCommitOffset := ct.acks.CommitOffset()
220	if desiredCommitOffset <= ct.lastConfirmedOffset {
221		// The server has already confirmed the commit offset.
222		return nilCursorOffset
223	}
224	if desiredCommitOffset <= extractOffsetFromElem(ct.pendingOffsets.Back()) {
225		// The commit offset has already been sent to the commit stream and is
226		// awaiting confirmation.
227		return nilCursorOffset
228	}
229	return desiredCommitOffset
230}
231
232// AddPending adds a sent, but not yet confirmed, committed offset.
233func (ct *commitCursorTracker) AddPending(offset int64) {
234	ct.pendingOffsets.PushBack(offset)
235}
236
237// ClearPending discards old pending offsets. Should be called when the commit
238// stream reconnects, as the server acknowledgments for these would not be
239// received.
240func (ct *commitCursorTracker) ClearPending() {
241	ct.pendingOffsets.Init()
242}
243
244// ConfirmOffsets processes the server's acknowledgment of the first
245// `numConfirmed` pending offsets.
246func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error {
247	if numPending := int64(ct.pendingOffsets.Len()); numPending < numConfirmed {
248		return fmt.Errorf("pubsublite: server acknowledged %d cursor commits, but only %d were sent", numConfirmed, numPending)
249	}
250
251	for i := int64(0); i < numConfirmed; i++ {
252		front := ct.pendingOffsets.Front()
253		ct.lastConfirmedOffset = extractOffsetFromElem(front)
254		ct.pendingOffsets.Remove(front)
255	}
256	return nil
257}
258
259// UpToDate when the server has confirmed the desired commit offset.
260func (ct *commitCursorTracker) UpToDate() bool {
261	return ct.acks.CommitOffset() <= ct.lastConfirmedOffset
262}
263