1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package pscompat
15
16import (
17	"time"
18
19	"cloud.google.com/go/pubsub"
20	"cloud.google.com/go/pubsublite/internal/wire"
21
22	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
23)
24
25const (
26	// MaxPublishRequestCount is the maximum number of messages that can be
27	// batched in a single publish request.
28	MaxPublishRequestCount = wire.MaxPublishRequestCount
29
30	// MaxPublishRequestBytes is the maximum allowed serialized size of a single
31	// publish request (containing a batch of messages) in bytes.
32	MaxPublishRequestBytes = wire.MaxPublishRequestBytes
33)
34
35// KeyExtractorFunc is a function that extracts an ordering key from a Message.
36type KeyExtractorFunc func(*pubsub.Message) []byte
37
38// PublishMessageTransformerFunc transforms a pubsub.Message to a Pub/Sub Lite
39// PubSubMessage API proto. If this returns an error, the pubsub.PublishResult
40// will be errored and the PublisherClient will consider this a fatal error and
41// terminate.
42type PublishMessageTransformerFunc func(*pubsub.Message, *pb.PubSubMessage) error
43
44// PublishSettings configure the PublisherClient. Batching settings
45// (DelayThreshold, CountThreshold, ByteThreshold, BufferedByteLimit) apply per
46// partition.
47//
48// A zero PublishSettings will result in values equivalent to
49// DefaultPublishSettings.
50type PublishSettings struct {
51	// Publish a non-empty batch after this delay has passed. If DelayThreshold is
52	// 0, it will be treated as DefaultPublishSettings.DelayThreshold. Otherwise
53	// must be > 0.
54	DelayThreshold time.Duration
55
56	// Publish a batch when it has this many messages. The maximum is
57	// MaxPublishRequestCount. If CountThreshold is 0, it will be treated as
58	// DefaultPublishSettings.CountThreshold. Otherwise must be > 0.
59	CountThreshold int
60
61	// Publish a batch when its size in bytes reaches this value. The maximum is
62	// MaxPublishRequestBytes. If ByteThreshold is 0, it will be treated as
63	// DefaultPublishSettings.ByteThreshold. Otherwise must be > 0.
64	ByteThreshold int
65
66	// The maximum time that the client will attempt to open a publish stream
67	// to the server. If Timeout is 0, it will be treated as
68	// DefaultPublishSettings.Timeout. Otherwise must be > 0.
69	//
70	// If your application has a low tolerance to backend unavailability, set
71	// Timeout to a lower duration to detect and handle. When the timeout is
72	// exceeded, the PublisherClient will terminate with ErrBackendUnavailable and
73	// details of the last error that occurred while trying to reconnect to
74	// backends. Note that if the timeout duration is long, ErrOverflow may occur
75	// first.
76	//
77	// It is not recommended to set Timeout below 2 minutes.
78	Timeout time.Duration
79
80	// The maximum number of bytes that the publisher will keep in memory before
81	// returning ErrOverflow. If BufferedByteLimit is 0, it will be treated as
82	// DefaultPublishSettings.BufferedByteLimit. Otherwise must be > 0.
83	//
84	// Note that this setting applies per partition. If BufferedByteLimit is being
85	// used to bound memory usage, keep in mind the number of partitions in the
86	// topic.
87	//
88	// Note that Pub/Sub Lite topics are provisioned a publishing throughput
89	// capacity, per partition, shared by all publisher clients. Setting a large
90	// buffer size can mitigate transient publish spikes. However, consistently
91	// attempting to publish messages at a much higher rate than the publishing
92	// throughput capacity can cause the buffers to overflow. For more
93	// information, see https://cloud.google.com/pubsub/lite/docs/topics.
94	BufferedByteLimit int
95
96	// Optional custom function that extracts an ordering key from a Message. The
97	// default implementation extracts the key from Message.OrderingKey.
98	KeyExtractor KeyExtractorFunc
99
100	// Optional custom function that transforms a pubsub.Message to a
101	// PubSubMessage API proto.
102	MessageTransformer PublishMessageTransformerFunc
103
104	// The polling interval to watch for topic partition count updates.
105	// Currently internal only and overridden in tests.
106	configPollPeriod time.Duration
107}
108
109// DefaultPublishSettings holds the default values for PublishSettings.
110var DefaultPublishSettings = PublishSettings{
111	DelayThreshold:    10 * time.Millisecond,
112	CountThreshold:    100,
113	ByteThreshold:     1e6,
114	Timeout:           7 * 24 * time.Hour,
115	BufferedByteLimit: 1e10,
116}
117
118func (s *PublishSettings) toWireSettings() wire.PublishSettings {
119	wireSettings := wire.PublishSettings{
120		DelayThreshold:    DefaultPublishSettings.DelayThreshold,
121		CountThreshold:    DefaultPublishSettings.CountThreshold,
122		ByteThreshold:     DefaultPublishSettings.ByteThreshold,
123		Timeout:           DefaultPublishSettings.Timeout,
124		BufferedByteLimit: DefaultPublishSettings.BufferedByteLimit,
125		ConfigPollPeriod:  wire.DefaultPublishSettings.ConfigPollPeriod,
126		Framework:         wire.FrameworkCloudPubSubShim,
127	}
128	// Negative values preserved, but will fail validation in wire package.
129	if s.DelayThreshold != 0 {
130		wireSettings.DelayThreshold = s.DelayThreshold
131	}
132	if s.CountThreshold != 0 {
133		wireSettings.CountThreshold = s.CountThreshold
134	}
135	if s.ByteThreshold != 0 {
136		wireSettings.ByteThreshold = s.ByteThreshold
137	}
138	if s.Timeout != 0 {
139		wireSettings.Timeout = s.Timeout
140	}
141	if s.BufferedByteLimit != 0 {
142		wireSettings.BufferedByteLimit = s.BufferedByteLimit
143	}
144	if s.configPollPeriod != 0 {
145		wireSettings.ConfigPollPeriod = s.configPollPeriod
146	}
147	return wireSettings
148}
149
150// NackHandler is invoked when pubsub.Message.Nack() is called. Pub/Sub Lite
151// does not have a concept of 'nack'. If the nack handler implementation returns
152// nil, the message is acknowledged. If an error is returned, the
153// SubscriberClient will consider this a fatal error and terminate.
154//
155// In Pub/Sub Lite, only a single subscriber for a given subscription is
156// connected to any partition at a time, and there is no other client that may
157// be able to handle messages.
158type NackHandler func(*pubsub.Message) error
159
160// ReceiveMessageTransformerFunc transforms a Pub/Sub Lite SequencedMessage API
161// proto to a pubsub.Message. The implementation must not set pubsub.Message.ID.
162//
163// If this returns an error, the SubscriberClient will consider this a fatal
164// error and terminate.
165type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) error
166
167// ReceiveSettings configure the SubscriberClient. Flow control settings
168// (MaxOutstandingMessages, MaxOutstandingBytes) apply per partition.
169//
170// A zero ReceiveSettings will result in values equivalent to
171// DefaultReceiveSettings.
172type ReceiveSettings struct {
173	// MaxOutstandingMessages is the maximum number of unacknowledged messages.
174	// If MaxOutstandingMessages is 0, it will be treated as
175	// DefaultReceiveSettings.MaxOutstandingMessages. Otherwise must be > 0.
176	MaxOutstandingMessages int
177
178	// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
179	// messages. If MaxOutstandingBytes is 0, it will be treated as
180	// DefaultReceiveSettings.MaxOutstandingBytes. Otherwise must be > 0.
181	//
182	// Note that this setting applies per partition. If MaxOutstandingBytes is
183	// being used to bound memory usage, keep in mind the number of partitions in
184	// the associated topic.
185	MaxOutstandingBytes int
186
187	// The maximum time that the client will attempt to open a subscribe stream
188	// to the server. If Timeout is 0, it will be treated as
189	// DefaultReceiveSettings.Timeout. Otherwise must be > 0.
190	//
191	// If your application has a low tolerance to backend unavailability, set
192	// Timeout to a lower duration to detect and handle. When the timeout is
193	// exceeded, the SubscriberClient will terminate with ErrBackendUnavailable
194	// and details of the last error that occurred while trying to reconnect to
195	// backends.
196	//
197	// It is not recommended to set Timeout below 2 minutes.
198	Timeout time.Duration
199
200	// The topic partition numbers (zero-indexed) to receive messages from.
201	// Values must be less than the number of partitions for the topic. If not
202	// specified, the SubscriberClient will use the partition assignment service
203	// to determine which partitions it should connect to.
204	Partitions []int
205
206	// Optional custom function to handle pubsub.Message.Nack() calls. If not set,
207	// the default behavior is to terminate the SubscriberClient.
208	NackHandler NackHandler
209
210	// Optional custom function that transforms a SequencedMessage API proto to a
211	// pubsub.Message.
212	MessageTransformer ReceiveMessageTransformerFunc
213}
214
215// DefaultReceiveSettings holds the default values for ReceiveSettings.
216var DefaultReceiveSettings = ReceiveSettings{
217	MaxOutstandingMessages: 1000,
218	MaxOutstandingBytes:    1e9,
219	Timeout:                7 * 24 * time.Hour,
220}
221
222func (s *ReceiveSettings) toWireSettings() wire.ReceiveSettings {
223	wireSettings := wire.ReceiveSettings{
224		MaxOutstandingMessages: DefaultReceiveSettings.MaxOutstandingMessages,
225		MaxOutstandingBytes:    DefaultReceiveSettings.MaxOutstandingBytes,
226		Timeout:                DefaultReceiveSettings.Timeout,
227		Partitions:             s.Partitions,
228		Framework:              wire.FrameworkCloudPubSubShim,
229	}
230	// Negative values preserved, but will fail validation in wire package.
231	if s.MaxOutstandingMessages != 0 {
232		wireSettings.MaxOutstandingMessages = s.MaxOutstandingMessages
233	}
234	if s.MaxOutstandingBytes != 0 {
235		wireSettings.MaxOutstandingBytes = s.MaxOutstandingBytes
236	}
237	if s.Timeout != 0 {
238		wireSettings.Timeout = s.Timeout
239	}
240	return wireSettings
241}
242