1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"errors"
18	"fmt"
19	"time"
20)
21
22const (
23	// MaxPublishRequestCount is the maximum number of messages that can be
24	// batched in a single publish request.
25	MaxPublishRequestCount = 1000
26
27	// MaxPublishRequestBytes is the maximum allowed serialized size of a single
28	// publish request (containing a batch of messages) in bytes. Must be lower
29	// than the gRPC limit of 4 MiB.
30	MaxPublishRequestBytes int = 3.5 * 1024 * 1024
31)
32
33// FrameworkType is the user-facing API for Cloud Pub/Sub Lite.
34type FrameworkType string
35
36// FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub.
37const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM"
38
39// PublishSettings control the batching of published messages. These settings
40// apply per partition.
41type PublishSettings struct {
42	// Publish a non-empty batch after this delay has passed. Must be > 0.
43	DelayThreshold time.Duration
44
45	// Publish a batch when it has this many messages. Must be > 0. The maximum is
46	// MaxPublishRequestCount.
47	CountThreshold int
48
49	// Publish a batch when its size in bytes reaches this value. Must be > 0. The
50	// maximum is MaxPublishRequestBytes.
51	ByteThreshold int
52
53	// The maximum time that the client will attempt to establish a publish stream
54	// connection to the server. Must be > 0.
55	//
56	// The timeout is exceeded, the publisher will terminate with the last error
57	// that occurred while trying to reconnect. Note that if the timeout duration
58	// is long, ErrOverflow may occur first.
59	Timeout time.Duration
60
61	// The maximum number of bytes that the publisher will keep in memory before
62	// returning ErrOverflow. Must be > 0.
63	//
64	// Note that Pub/Sub Lite topics are provisioned a publishing throughput
65	// capacity, per partition, shared by all publisher clients. Setting a large
66	// buffer size can mitigate transient publish spikes. However, consistently
67	// attempting to publish messages at a much higher rate than the publishing
68	// throughput capacity can cause the buffers to overflow. For more
69	// information, see https://cloud.google.com/pubsub/lite/docs/topics.
70	BufferedByteLimit int
71
72	// The polling interval to watch for topic partition count updates. Set to 0
73	// to disable polling if the number of partitions will never update.
74	ConfigPollPeriod time.Duration
75
76	// The user-facing API type.
77	Framework FrameworkType
78}
79
80// DefaultPublishSettings holds the default values for PublishSettings.
81var DefaultPublishSettings = PublishSettings{
82	DelayThreshold: 10 * time.Millisecond,
83	CountThreshold: 100,
84	ByteThreshold:  1e6,
85	Timeout:        7 * 24 * time.Hour, // 1 week
86	// By default set to a high limit that is not likely to occur, but prevents
87	// OOM errors in clients.
88	BufferedByteLimit: 1 << 30, // 1 GiB
89	ConfigPollPeriod:  10 * time.Minute,
90}
91
92func validatePublishSettings(settings PublishSettings) error {
93	if settings.DelayThreshold <= 0 {
94		return errors.New("pubsublite: invalid publish settings. DelayThreshold duration must be > 0")
95	}
96	if settings.Timeout <= 0 {
97		return errors.New("pubsublite: invalid publish settings. Timeout duration must be > 0")
98	}
99	if settings.CountThreshold <= 0 {
100		return errors.New("pubsublite: invalid publish settings. CountThreshold must be > 0")
101	}
102	if settings.CountThreshold > MaxPublishRequestCount {
103		return fmt.Errorf("pubsublite: invalid publish settings. Maximum CountThreshold is MaxPublishRequestCount (%d)", MaxPublishRequestCount)
104	}
105	if settings.ByteThreshold <= 0 {
106		return errors.New("pubsublite: invalid publish settings. ByteThreshold must be > 0")
107	}
108	if settings.ByteThreshold > MaxPublishRequestBytes {
109		return fmt.Errorf("pubsublite: invalid publish settings. Maximum ByteThreshold is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes)
110	}
111	if settings.BufferedByteLimit <= 0 {
112		return errors.New("pubsublite: invalid publish settings. BufferedByteLimit must be > 0")
113	}
114	return nil
115}
116
117// ReceiveSettings control the receiving of messages. These settings apply
118// per partition.
119type ReceiveSettings struct {
120	// MaxOutstandingMessages is the maximum number of unacknowledged messages.
121	// Must be > 0.
122	MaxOutstandingMessages int
123
124	// MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged
125	// messages. Must be > 0.
126	MaxOutstandingBytes int
127
128	// The maximum time that the client will attempt to establish a subscribe
129	// stream connection to the server. Must be > 0.
130	//
131	// The timeout is exceeded, the subscriber will terminate with the last error
132	// that occurred while trying to reconnect.
133	Timeout time.Duration
134
135	// The topic partition numbers (zero-indexed) to receive messages from.
136	// Values must be less than the number of partitions for the topic. If not
137	// specified, the client will use the partition assignment service to
138	// determine which partitions it should connect to.
139	Partitions []int
140
141	// The user-facing API type.
142	Framework FrameworkType
143}
144
145// DefaultReceiveSettings holds the default values for ReceiveSettings.
146var DefaultReceiveSettings = ReceiveSettings{
147	MaxOutstandingMessages: 1000,
148	MaxOutstandingBytes:    1e9,
149	Timeout:                7 * 24 * time.Hour, // 1 week
150}
151
152func validateReceiveSettings(settings ReceiveSettings) error {
153	if settings.MaxOutstandingMessages <= 0 {
154		return errors.New("pubsublite: invalid receive settings. MaxOutstandingMessages must be > 0")
155	}
156	if settings.MaxOutstandingBytes <= 0 {
157		return errors.New("pubsublite: invalid receive settings. MaxOutstandingBytes must be > 0")
158	}
159	if settings.Timeout <= 0 {
160		return errors.New("pubsublite: invalid receive settings. Timeout duration must be > 0")
161	}
162	if len(settings.Partitions) > 0 {
163		var void struct{}
164		partitionMap := make(map[int]struct{})
165		for _, p := range settings.Partitions {
166			if p < 0 {
167				return fmt.Errorf("pubsublite: invalid partition number %d in receive settings. Partition numbers are zero-indexed", p)
168			}
169			if _, exists := partitionMap[p]; exists {
170				return fmt.Errorf("pubsublite: invalid receive settings. Duplicate partition number %d", p)
171			}
172			partitionMap[p] = void
173		}
174	}
175	return nil
176}
177