1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "container/list" 18 "fmt" 19 "sync" 20) 21 22// AckConsumer is the interface exported from this package for acking messages. 23type AckConsumer interface { 24 Ack() 25} 26 27// ackedFunc is invoked when a message has been acked by the user. Note: if the 28// ackedFunc implementation calls any ackConsumer methods, it needs to run in a 29// goroutine to avoid a deadlock. 30type ackedFunc func(*ackConsumer) 31 32// ackConsumer is used for handling message acks. It is attached to a Message 33// and also stored within the ackTracker until the message has been acked by the 34// user. 35type ackConsumer struct { 36 // The message offset. 37 Offset int64 38 // Bytes released to the flow controller once the message has been acked. 39 MsgBytes int64 40 41 // Guards access to fields below. 42 mu sync.Mutex 43 acked bool 44 onAck ackedFunc 45} 46 47func newAckConsumer(offset, msgBytes int64, onAck ackedFunc) *ackConsumer { 48 return &ackConsumer{Offset: offset, MsgBytes: msgBytes, onAck: onAck} 49} 50 51func (ac *ackConsumer) Ack() { 52 ac.mu.Lock() 53 defer ac.mu.Unlock() 54 55 if ac.acked { 56 return 57 } 58 ac.acked = true 59 if ac.onAck != nil { 60 // Not invoked in a goroutine here for ease of testing. 61 ac.onAck(ac) 62 } 63} 64 65func (ac *ackConsumer) IsAcked() bool { 66 ac.mu.Lock() 67 defer ac.mu.Unlock() 68 return ac.acked 69} 70 71// Clear onAck when the ack can no longer be processed. The user's ack would be 72// ignored. 73func (ac *ackConsumer) Clear() { 74 ac.mu.Lock() 75 defer ac.mu.Unlock() 76 ac.onAck = nil 77} 78 79// Represents an uninitialized cursor offset. A sentinel value is used instead 80// of an optional to simplify cursor comparisons (i.e. -1 works without the need 81// to check for nil and then convert to int64). 82const nilCursorOffset int64 = -1 83 84// ackTracker manages outstanding message acks, i.e. messages that have been 85// delivered to the user, but not yet acked. It is used by the committer and 86// subscribeStream, so requires its own mutex. 87type ackTracker struct { 88 // Guards access to fields below. 89 mu sync.Mutex 90 // All offsets before and including this prefix have been acked by the user. 91 ackedPrefixOffset int64 92 // Outstanding message acks, strictly ordered by increasing message offsets. 93 outstandingAcks *list.List // Value = *ackConsumer 94 // Whether new acks can be pushed. 95 enablePush bool 96} 97 98func newAckTracker() *ackTracker { 99 return &ackTracker{ 100 ackedPrefixOffset: nilCursorOffset, 101 outstandingAcks: list.New(), 102 enablePush: true, 103 } 104} 105 106// Push adds an outstanding ack to the tracker. 107func (at *ackTracker) Push(ack *ackConsumer) error { 108 at.mu.Lock() 109 defer at.mu.Unlock() 110 111 if !at.enablePush { 112 ack.Clear() 113 return nil 114 } 115 116 // These errors should not occur unless there is a bug in the client library 117 // as message ordering should have been validated by subscriberOffsetTracker. 118 if ack.Offset <= at.ackedPrefixOffset { 119 return errOutOfOrderMessages 120 } 121 if elem := at.outstandingAcks.Back(); elem != nil { 122 lastOutstandingAck, _ := elem.Value.(*ackConsumer) 123 if ack.Offset <= lastOutstandingAck.Offset { 124 return errOutOfOrderMessages 125 } 126 } 127 128 at.outstandingAcks.PushBack(ack) 129 return nil 130} 131 132// CommitOffset returns the cursor offset that should be committed. May return 133// nilCursorOffset if no messages have been acked thus far. 134func (at *ackTracker) CommitOffset() int64 { 135 at.mu.Lock() 136 defer at.mu.Unlock() 137 138 at.unsafeProcessAcks() 139 140 if at.ackedPrefixOffset == nilCursorOffset { 141 return nilCursorOffset 142 } 143 // Convert from last acked to first unacked, which is the commit offset. 144 return at.ackedPrefixOffset + 1 145} 146 147// Release clears and invalidates any outstanding acks. Push will clear and 148// discard new acks. This should be called when the committer terminates. 149func (at *ackTracker) Release() { 150 at.mu.Lock() 151 defer at.mu.Unlock() 152 153 at.enablePush = false 154 at.unsafeProcessAcks() 155 at.unsafeClearAcks() 156} 157 158// Reset the state of the tracker. Clears and invalidates any outstanding acks. 159func (at *ackTracker) Reset() { 160 at.mu.Lock() 161 defer at.mu.Unlock() 162 163 at.unsafeClearAcks() 164 at.ackedPrefixOffset = nilCursorOffset 165 at.enablePush = true 166} 167 168// Clears and invalidates any outstanding acks. 169func (at *ackTracker) unsafeClearAcks() { 170 for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() { 171 ack, _ := elem.Value.(*ackConsumer) 172 ack.Clear() 173 } 174 at.outstandingAcks.Init() 175} 176 177// Process outstanding acks and update `ackedPrefixOffset` until an unacked 178// message is found. 179func (at *ackTracker) unsafeProcessAcks() { 180 for { 181 elem := at.outstandingAcks.Front() 182 if elem == nil { 183 break 184 } 185 ack, _ := elem.Value.(*ackConsumer) 186 if !ack.IsAcked() { 187 break 188 } 189 at.ackedPrefixOffset = ack.Offset 190 at.outstandingAcks.Remove(elem) 191 ack.Clear() 192 } 193} 194 195// Empty returns true if there are no outstanding acks. 196func (at *ackTracker) Empty() bool { 197 at.mu.Lock() 198 defer at.mu.Unlock() 199 return at.outstandingAcks.Front() == nil 200} 201 202// commitCursorTracker tracks pending and last successful committed offsets. 203// It is only accessed by the committer. 204type commitCursorTracker struct { 205 // Used to obtain the desired commit offset based on messages acked by the 206 // user. 207 acks *ackTracker 208 // Last offset for which the server confirmed (acknowledged) the commit. 209 lastConfirmedOffset int64 210 // Queue of committed offsets awaiting confirmation from the server. 211 pendingOffsets *list.List // Value = int64 212} 213 214func newCommitCursorTracker(acks *ackTracker) *commitCursorTracker { 215 return &commitCursorTracker{ 216 acks: acks, 217 lastConfirmedOffset: nilCursorOffset, 218 pendingOffsets: list.New(), 219 } 220} 221 222func extractOffsetFromElem(elem *list.Element) int64 { 223 if elem == nil { 224 return nilCursorOffset 225 } 226 offset, _ := elem.Value.(int64) 227 return offset 228} 229 230// Reset the state of the tracker. 231func (ct *commitCursorTracker) Reset() { 232 ct.acks.Reset() 233 ct.lastConfirmedOffset = nilCursorOffset 234 ct.pendingOffsets.Init() 235} 236 237// NextOffset is the commit offset to be sent to the stream. Returns 238// nilCursorOffset if the commit offset does not need to be updated. 239func (ct *commitCursorTracker) NextOffset() int64 { 240 desiredCommitOffset := ct.acks.CommitOffset() 241 if desiredCommitOffset <= ct.lastConfirmedOffset { 242 // The server has already confirmed the commit offset. 243 return nilCursorOffset 244 } 245 if desiredCommitOffset <= extractOffsetFromElem(ct.pendingOffsets.Back()) { 246 // The commit offset has already been sent to the commit stream and is 247 // awaiting confirmation. 248 return nilCursorOffset 249 } 250 return desiredCommitOffset 251} 252 253// AddPending adds a sent, but not yet confirmed, committed offset. 254func (ct *commitCursorTracker) AddPending(offset int64) { 255 ct.pendingOffsets.PushBack(offset) 256} 257 258// ClearPending discards old pending offsets. Should be called when the commit 259// stream reconnects, as the server acknowledgments for these would not be 260// received. 261func (ct *commitCursorTracker) ClearPending() { 262 ct.pendingOffsets.Init() 263} 264 265// ConfirmOffsets processes the server's acknowledgment of the first 266// `numConfirmed` pending offsets. 267func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error { 268 if numPending := int64(ct.pendingOffsets.Len()); numPending < numConfirmed { 269 return fmt.Errorf("pubsublite: server acknowledged %d cursor commits, but only %d were sent", numConfirmed, numPending) 270 } 271 272 for i := int64(0); i < numConfirmed; i++ { 273 front := ct.pendingOffsets.Front() 274 ct.lastConfirmedOffset = extractOffsetFromElem(front) 275 ct.pendingOffsets.Remove(front) 276 } 277 return nil 278} 279 280// UpToDate when the server has confirmed the desired commit offset and there 281// are no pending acks. 282func (ct *commitCursorTracker) UpToDate() bool { 283 return ct.acks.CommitOffset() <= ct.lastConfirmedOffset && ct.acks.Empty() 284} 285