1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"errors"
18	"fmt"
19	"math"
20
21	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
22)
23
24var (
25	errTokenCounterBytesNegative    = errors.New("pubsublite: received messages that account for more bytes than were requested")
26	errTokenCounterMessagesNegative = errors.New("pubsublite: received more messages than were requested")
27	errOutOfOrderMessages           = errors.New("pubsublite: server delivered messages out of order")
28)
29
30type flowControlTokens struct {
31	Bytes    int64
32	Messages int64
33}
34
35// A tokenCounter stores the amount of outstanding byte and message flow control
36// tokens that the client believes exists for the stream.
37type tokenCounter struct {
38	Bytes    int64
39	Messages int64
40}
41
42func saturatedAdd(sum, delta int64) int64 {
43	remainder := math.MaxInt64 - sum
44	if delta >= remainder {
45		return math.MaxInt64
46	}
47	return sum + delta
48}
49
50func (tc *tokenCounter) Add(delta flowControlTokens) {
51	tc.Bytes = saturatedAdd(tc.Bytes, delta.Bytes)
52	tc.Messages = saturatedAdd(tc.Messages, delta.Messages)
53}
54
55func (tc *tokenCounter) Sub(delta flowControlTokens) error {
56	if delta.Bytes > tc.Bytes {
57		return errTokenCounterBytesNegative
58	}
59	if delta.Messages > tc.Messages {
60		return errTokenCounterMessagesNegative
61	}
62	tc.Bytes -= delta.Bytes
63	tc.Messages -= delta.Messages
64	return nil
65}
66
67func (tc *tokenCounter) Reset() {
68	tc.Bytes = 0
69	tc.Messages = 0
70}
71
72func (tc *tokenCounter) ToFlowControlRequest() *pb.FlowControlRequest {
73	if tc.Bytes <= 0 && tc.Messages <= 0 {
74		return nil
75	}
76	return &pb.FlowControlRequest{
77		AllowedBytes:    tc.Bytes,
78		AllowedMessages: tc.Messages,
79	}
80}
81
82// flowControlBatcher tracks flow control tokens and manages batching of flow
83// control requests to avoid overwhelming the server. It is only accessed by
84// the subscribeStream.
85type flowControlBatcher struct {
86	// The current amount of outstanding byte and message flow control tokens.
87	clientTokens tokenCounter
88	// The pending batch flow control request that needs to be sent to the stream.
89	pendingTokens tokenCounter
90}
91
92const expediteBatchRequestRatio = 0.5
93
94func exceedsExpediteRatio(pending, client int64) bool {
95	return client > 0 && (float64(pending)/float64(client)) >= expediteBatchRequestRatio
96}
97
98// OnClientFlow increments flow control tokens. This occurs when:
99// - Initialization from ReceiveSettings.
100// - The user acks messages.
101func (fc *flowControlBatcher) OnClientFlow(tokens flowControlTokens) {
102	fc.clientTokens.Add(tokens)
103	fc.pendingTokens.Add(tokens)
104}
105
106// OnMessages decrements flow control tokens when messages are received from the
107// server.
108func (fc *flowControlBatcher) OnMessages(msgs []*pb.SequencedMessage) error {
109	var totalBytes int64
110	for _, msg := range msgs {
111		totalBytes += msg.GetSizeBytes()
112	}
113	return fc.clientTokens.Sub(flowControlTokens{Bytes: totalBytes, Messages: int64(len(msgs))})
114}
115
116// RequestForRestart returns a FlowControlRequest that should be sent when a new
117// subscriber stream is connected. May return nil.
118func (fc *flowControlBatcher) RequestForRestart() *pb.FlowControlRequest {
119	fc.pendingTokens.Reset()
120	return fc.clientTokens.ToFlowControlRequest()
121}
122
123// ReleasePendingRequest returns a non-nil request when there is a batch
124// FlowControlRequest to send to the stream.
125func (fc *flowControlBatcher) ReleasePendingRequest() *pb.FlowControlRequest {
126	req := fc.pendingTokens.ToFlowControlRequest()
127	fc.pendingTokens.Reset()
128	return req
129}
130
131// ShouldExpediteBatchRequest returns true if a batch FlowControlRequest should
132// be sent ASAP to avoid starving the client of messages. This occurs when the
133// client is rapidly acking messages.
134func (fc *flowControlBatcher) ShouldExpediteBatchRequest() bool {
135	if exceedsExpediteRatio(fc.pendingTokens.Bytes, fc.clientTokens.Bytes) {
136		return true
137	}
138	if exceedsExpediteRatio(fc.pendingTokens.Messages, fc.clientTokens.Messages) {
139		return true
140	}
141	return false
142}
143
144// subscriberOffsetTracker tracks the expected offset of the next message
145// received from the server. It is only accessed by the subscribeStream.
146type subscriberOffsetTracker struct {
147	minNextOffset int64
148}
149
150// RequestForRestart returns the seek request to send when a new subscribe
151// stream reconnects. Returns nil if the subscriber has just started, in which
152// case the server returns the offset of the last committed cursor.
153func (ot *subscriberOffsetTracker) RequestForRestart() *pb.SeekRequest {
154	if ot.minNextOffset <= 0 {
155		return nil
156	}
157	return &pb.SeekRequest{
158		Target: &pb.SeekRequest_Cursor{
159			Cursor: &pb.Cursor{Offset: ot.minNextOffset},
160		},
161	}
162}
163
164// OnMessages verifies that messages are delivered in order and updates the next
165// expected offset.
166func (ot *subscriberOffsetTracker) OnMessages(msgs []*pb.SequencedMessage) error {
167	nextOffset := ot.minNextOffset
168	for i, msg := range msgs {
169		offset := msg.GetCursor().GetOffset()
170		if offset < nextOffset {
171			if i == 0 {
172				return fmt.Errorf("pubsublite: server delivered messages with start offset = %d, expected >= %d", offset, ot.minNextOffset)
173			}
174			return errOutOfOrderMessages
175		}
176		nextOffset = offset + 1
177	}
178	ot.minNextOffset = nextOffset
179	return nil
180}
181