1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "container/list" 18 "fmt" 19 "sync" 20) 21 22// AckConsumer is the interface exported from this package for acking messages. 23type AckConsumer interface { 24 Ack() 25} 26 27// ackedFunc is invoked when a message has been acked by the user. Note: if the 28// ackedFunc implementation calls any ackConsumer methods, it needs to run in a 29// goroutine to avoid a deadlock. 30type ackedFunc func(*ackConsumer) 31 32// ackConsumer is used for handling message acks. It is attached to a Message 33// and also stored within the ackTracker until the message has been acked by the 34// user. 35type ackConsumer struct { 36 // The message offset. 37 Offset int64 38 // Bytes released to the flow controller once the message has been acked. 39 MsgBytes int64 40 41 // Guards access to fields below. 42 mu sync.Mutex 43 acked bool 44 onAck ackedFunc 45} 46 47func newAckConsumer(offset, msgBytes int64, onAck ackedFunc) *ackConsumer { 48 return &ackConsumer{Offset: offset, MsgBytes: msgBytes, onAck: onAck} 49} 50 51func (ac *ackConsumer) Ack() { 52 ac.mu.Lock() 53 defer ac.mu.Unlock() 54 55 if ac.acked { 56 return 57 } 58 ac.acked = true 59 if ac.onAck != nil { 60 // Not invoked in a goroutine here for ease of testing. 61 ac.onAck(ac) 62 } 63} 64 65func (ac *ackConsumer) IsAcked() bool { 66 ac.mu.Lock() 67 defer ac.mu.Unlock() 68 return ac.acked 69} 70 71// Clear onAck when the ack can no longer be processed. The user's ack would be 72// ignored. 73func (ac *ackConsumer) Clear() { 74 ac.mu.Lock() 75 defer ac.mu.Unlock() 76 ac.onAck = nil 77} 78 79// Represents an uninitialized cursor offset. A sentinel value is used instead 80// of an optional to simplify cursor comparisons (i.e. -1 works without the need 81// to check for nil and then convert to int64). 82const nilCursorOffset int64 = -1 83 84// ackTracker manages outstanding message acks, i.e. messages that have been 85// delivered to the user, but not yet acked. It is used by the committer and 86// subscribeStream, so requires its own mutex. 87type ackTracker struct { 88 // Guards access to fields below. 89 mu sync.Mutex 90 // All offsets before and including this prefix have been acked by the user. 91 ackedPrefixOffset int64 92 // Outstanding message acks, strictly ordered by increasing message offsets. 93 outstandingAcks *list.List // Value = *ackConsumer 94 // Whether new acks can be pushed. 95 enablePush bool 96} 97 98func newAckTracker() *ackTracker { 99 return &ackTracker{ 100 ackedPrefixOffset: nilCursorOffset, 101 outstandingAcks: list.New(), 102 enablePush: true, 103 } 104} 105 106// Push adds an outstanding ack to the tracker. 107func (at *ackTracker) Push(ack *ackConsumer) error { 108 at.mu.Lock() 109 defer at.mu.Unlock() 110 111 if !at.enablePush { 112 ack.Clear() 113 return nil 114 } 115 116 // These errors should not occur unless there is a bug in the client library 117 // as message ordering should have been validated by subscriberOffsetTracker. 118 if ack.Offset <= at.ackedPrefixOffset { 119 return errOutOfOrderMessages 120 } 121 if elem := at.outstandingAcks.Back(); elem != nil { 122 lastOutstandingAck, _ := elem.Value.(*ackConsumer) 123 if ack.Offset <= lastOutstandingAck.Offset { 124 return errOutOfOrderMessages 125 } 126 } 127 128 at.outstandingAcks.PushBack(ack) 129 return nil 130} 131 132// CommitOffset returns the cursor offset that should be committed. May return 133// nilCursorOffset if no messages have been acked thus far. 134func (at *ackTracker) CommitOffset() int64 { 135 at.mu.Lock() 136 defer at.mu.Unlock() 137 138 at.unsafeProcessAcks() 139 140 if at.ackedPrefixOffset == nilCursorOffset { 141 return nilCursorOffset 142 } 143 // Convert from last acked to first unacked, which is the commit offset. 144 return at.ackedPrefixOffset + 1 145} 146 147// Release clears and invalidates any outstanding acks. Push will clear and 148// discard new acks. This should be called when the committer terminates. 149func (at *ackTracker) Release() { 150 at.mu.Lock() 151 defer at.mu.Unlock() 152 153 at.enablePush = false 154 at.unsafeProcessAcks() 155 156 for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() { 157 ack, _ := elem.Value.(*ackConsumer) 158 ack.Clear() 159 } 160 at.outstandingAcks.Init() 161} 162 163// Process outstanding acks and update `ackedPrefixOffset` until an unacked 164// message is found. 165func (at *ackTracker) unsafeProcessAcks() { 166 for { 167 elem := at.outstandingAcks.Front() 168 if elem == nil { 169 break 170 } 171 ack, _ := elem.Value.(*ackConsumer) 172 if !ack.IsAcked() { 173 break 174 } 175 at.ackedPrefixOffset = ack.Offset 176 at.outstandingAcks.Remove(elem) 177 ack.Clear() 178 } 179} 180 181// Empty returns true if there are no outstanding acks. 182func (at *ackTracker) Empty() bool { 183 at.mu.Lock() 184 defer at.mu.Unlock() 185 return at.outstandingAcks.Front() == nil 186} 187 188// commitCursorTracker tracks pending and last successful committed offsets. 189// It is only accessed by the committer. 190type commitCursorTracker struct { 191 // Used to obtain the desired commit offset based on messages acked by the 192 // user. 193 acks *ackTracker 194 // Last offset for which the server confirmed (acknowledged) the commit. 195 lastConfirmedOffset int64 196 // Queue of committed offsets awaiting confirmation from the server. 197 pendingOffsets *list.List // Value = int64 198} 199 200func newCommitCursorTracker(acks *ackTracker) *commitCursorTracker { 201 return &commitCursorTracker{ 202 acks: acks, 203 lastConfirmedOffset: nilCursorOffset, 204 pendingOffsets: list.New(), 205 } 206} 207 208func extractOffsetFromElem(elem *list.Element) int64 { 209 if elem == nil { 210 return nilCursorOffset 211 } 212 offset, _ := elem.Value.(int64) 213 return offset 214} 215 216// NextOffset is the commit offset to be sent to the stream. Returns 217// nilCursorOffset if the commit offset does not need to be updated. 218func (ct *commitCursorTracker) NextOffset() int64 { 219 desiredCommitOffset := ct.acks.CommitOffset() 220 if desiredCommitOffset <= ct.lastConfirmedOffset { 221 // The server has already confirmed the commit offset. 222 return nilCursorOffset 223 } 224 if desiredCommitOffset <= extractOffsetFromElem(ct.pendingOffsets.Back()) { 225 // The commit offset has already been sent to the commit stream and is 226 // awaiting confirmation. 227 return nilCursorOffset 228 } 229 return desiredCommitOffset 230} 231 232// AddPending adds a sent, but not yet confirmed, committed offset. 233func (ct *commitCursorTracker) AddPending(offset int64) { 234 ct.pendingOffsets.PushBack(offset) 235} 236 237// ClearPending discards old pending offsets. Should be called when the commit 238// stream reconnects, as the server acknowledgments for these would not be 239// received. 240func (ct *commitCursorTracker) ClearPending() { 241 ct.pendingOffsets.Init() 242} 243 244// ConfirmOffsets processes the server's acknowledgment of the first 245// `numConfirmed` pending offsets. 246func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error { 247 if numPending := int64(ct.pendingOffsets.Len()); numPending < numConfirmed { 248 return fmt.Errorf("pubsublite: server acknowledged %d cursor commits, but only %d were sent", numConfirmed, numPending) 249 } 250 251 for i := int64(0); i < numConfirmed; i++ { 252 front := ct.pendingOffsets.Front() 253 ct.lastConfirmedOffset = extractOffsetFromElem(front) 254 ct.pendingOffsets.Remove(front) 255 } 256 return nil 257} 258 259// UpToDate when the server has confirmed the desired commit offset. 260func (ct *commitCursorTracker) UpToDate() bool { 261 return ct.acks.CommitOffset() <= ct.lastConfirmedOffset 262} 263