1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "errors" 18 "fmt" 19 "math" 20 21 pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" 22) 23 24var ( 25 errTokenCounterBytesNegative = errors.New("pubsublite: received messages that account for more bytes than were requested") 26 errTokenCounterMessagesNegative = errors.New("pubsublite: received more messages than were requested") 27 errOutOfOrderMessages = errors.New("pubsublite: server delivered messages out of order") 28) 29 30type flowControlTokens struct { 31 Bytes int64 32 Messages int64 33} 34 35// A tokenCounter stores the amount of outstanding byte and message flow control 36// tokens that the client believes exists for the stream. 37type tokenCounter struct { 38 Bytes int64 39 Messages int64 40} 41 42func saturatedAdd(sum, delta int64) int64 { 43 remainder := math.MaxInt64 - sum 44 if delta >= remainder { 45 return math.MaxInt64 46 } 47 return sum + delta 48} 49 50func (tc *tokenCounter) Add(delta flowControlTokens) { 51 tc.Bytes = saturatedAdd(tc.Bytes, delta.Bytes) 52 tc.Messages = saturatedAdd(tc.Messages, delta.Messages) 53} 54 55func (tc *tokenCounter) Sub(delta flowControlTokens) error { 56 if delta.Bytes > tc.Bytes { 57 return errTokenCounterBytesNegative 58 } 59 if delta.Messages > tc.Messages { 60 return errTokenCounterMessagesNegative 61 } 62 tc.Bytes -= delta.Bytes 63 tc.Messages -= delta.Messages 64 return nil 65} 66 67func (tc *tokenCounter) Reset() { 68 tc.Bytes = 0 69 tc.Messages = 0 70} 71 72func (tc *tokenCounter) ToFlowControlRequest() *pb.FlowControlRequest { 73 if tc.Bytes <= 0 && tc.Messages <= 0 { 74 return nil 75 } 76 return &pb.FlowControlRequest{ 77 AllowedBytes: tc.Bytes, 78 AllowedMessages: tc.Messages, 79 } 80} 81 82// flowControlBatcher tracks flow control tokens and manages batching of flow 83// control requests to avoid overwhelming the server. It is only accessed by 84// the subscribeStream. 85type flowControlBatcher struct { 86 // The current amount of outstanding byte and message flow control tokens. 87 clientTokens tokenCounter 88 // The pending batch flow control request that needs to be sent to the stream. 89 pendingTokens tokenCounter 90} 91 92const expediteBatchRequestRatio = 0.5 93 94func exceedsExpediteRatio(pending, client int64) bool { 95 return client > 0 && (float64(pending)/float64(client)) >= expediteBatchRequestRatio 96} 97 98// OnClientFlow increments flow control tokens. This occurs when: 99// - Initialization from ReceiveSettings. 100// - The user acks messages. 101func (fc *flowControlBatcher) OnClientFlow(tokens flowControlTokens) { 102 fc.clientTokens.Add(tokens) 103 fc.pendingTokens.Add(tokens) 104} 105 106// OnMessages decrements flow control tokens when messages are received from the 107// server. 108func (fc *flowControlBatcher) OnMessages(msgs []*pb.SequencedMessage) error { 109 var totalBytes int64 110 for _, msg := range msgs { 111 totalBytes += msg.GetSizeBytes() 112 } 113 return fc.clientTokens.Sub(flowControlTokens{Bytes: totalBytes, Messages: int64(len(msgs))}) 114} 115 116// RequestForRestart returns a FlowControlRequest that should be sent when a new 117// subscriber stream is connected. May return nil. 118func (fc *flowControlBatcher) RequestForRestart() *pb.FlowControlRequest { 119 fc.pendingTokens.Reset() 120 return fc.clientTokens.ToFlowControlRequest() 121} 122 123// ReleasePendingRequest returns a non-nil request when there is a batch 124// FlowControlRequest to send to the stream. 125func (fc *flowControlBatcher) ReleasePendingRequest() *pb.FlowControlRequest { 126 req := fc.pendingTokens.ToFlowControlRequest() 127 fc.pendingTokens.Reset() 128 return req 129} 130 131// ShouldExpediteBatchRequest returns true if a batch FlowControlRequest should 132// be sent ASAP to avoid starving the client of messages. This occurs when the 133// client is rapidly acking messages. 134func (fc *flowControlBatcher) ShouldExpediteBatchRequest() bool { 135 if exceedsExpediteRatio(fc.pendingTokens.Bytes, fc.clientTokens.Bytes) { 136 return true 137 } 138 if exceedsExpediteRatio(fc.pendingTokens.Messages, fc.clientTokens.Messages) { 139 return true 140 } 141 return false 142} 143 144// subscriberOffsetTracker tracks the expected offset of the next message 145// received from the server. It is only accessed by the subscribeStream. 146type subscriberOffsetTracker struct { 147 minNextOffset int64 148} 149 150// RequestForRestart returns the seek request to send when a new subscribe 151// stream reconnects. Returns nil if the subscriber has just started, in which 152// case the server returns the offset of the last committed cursor. 153func (ot *subscriberOffsetTracker) RequestForRestart() *pb.SeekRequest { 154 if ot.minNextOffset <= 0 { 155 return nil 156 } 157 return &pb.SeekRequest{ 158 Target: &pb.SeekRequest_Cursor{ 159 Cursor: &pb.Cursor{Offset: ot.minNextOffset}, 160 }, 161 } 162} 163 164// OnMessages verifies that messages are delivered in order and updates the next 165// expected offset. 166func (ot *subscriberOffsetTracker) OnMessages(msgs []*pb.SequencedMessage) error { 167 nextOffset := ot.minNextOffset 168 for i, msg := range msgs { 169 offset := msg.GetCursor().GetOffset() 170 if offset < nextOffset { 171 if i == 0 { 172 return fmt.Errorf("pubsublite: server delivered messages with start offset = %d, expected >= %d", offset, ot.minNextOffset) 173 } 174 return errOutOfOrderMessages 175 } 176 nextOffset = offset + 1 177 } 178 ot.minNextOffset = nextOffset 179 return nil 180} 181