1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package pscompat 15 16import ( 17 "time" 18 19 "cloud.google.com/go/pubsub" 20 "cloud.google.com/go/pubsublite/internal/wire" 21 22 pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" 23) 24 25const ( 26 // MaxPublishRequestCount is the maximum number of messages that can be 27 // batched in a single publish request. 28 MaxPublishRequestCount = wire.MaxPublishRequestCount 29 30 // MaxPublishRequestBytes is the maximum allowed serialized size of a single 31 // publish request (containing a batch of messages) in bytes. 32 MaxPublishRequestBytes = wire.MaxPublishRequestBytes 33) 34 35// KeyExtractorFunc is a function that extracts an ordering key from a Message. 36type KeyExtractorFunc func(*pubsub.Message) []byte 37 38// PublishMessageTransformerFunc transforms a pubsub.Message to a Pub/Sub Lite 39// PubSubMessage API proto. If this returns an error, the pubsub.PublishResult 40// will be errored and the PublisherClient will consider this a fatal error and 41// terminate. 42type PublishMessageTransformerFunc func(*pubsub.Message, *pb.PubSubMessage) error 43 44// PublishSettings configure the PublisherClient. Batching settings 45// (DelayThreshold, CountThreshold, ByteThreshold, BufferedByteLimit) apply per 46// partition. 47// 48// A zero PublishSettings will result in values equivalent to 49// DefaultPublishSettings. 50type PublishSettings struct { 51 // Publish a non-empty batch after this delay has passed. If DelayThreshold is 52 // 0, it will be treated as DefaultPublishSettings.DelayThreshold. Otherwise 53 // must be > 0. 54 DelayThreshold time.Duration 55 56 // Publish a batch when it has this many messages. The maximum is 57 // MaxPublishRequestCount. If CountThreshold is 0, it will be treated as 58 // DefaultPublishSettings.CountThreshold. Otherwise must be > 0. 59 CountThreshold int 60 61 // Publish a batch when its size in bytes reaches this value. The maximum is 62 // MaxPublishRequestBytes. If ByteThreshold is 0, it will be treated as 63 // DefaultPublishSettings.ByteThreshold. Otherwise must be > 0. 64 ByteThreshold int 65 66 // The maximum time that the client will attempt to open a publish stream 67 // to the server. If Timeout is 0, it will be treated as 68 // DefaultPublishSettings.Timeout. Otherwise must be > 0. 69 // 70 // If your application has a low tolerance to backend unavailability, set 71 // Timeout to a lower duration to detect and handle. When the timeout is 72 // exceeded, the PublisherClient will terminate with ErrBackendUnavailable and 73 // details of the last error that occurred while trying to reconnect to 74 // backends. Note that if the timeout duration is long, ErrOverflow may occur 75 // first. 76 // 77 // It is not recommended to set Timeout below 2 minutes. 78 Timeout time.Duration 79 80 // The maximum number of bytes that the publisher will keep in memory before 81 // returning ErrOverflow. If BufferedByteLimit is 0, it will be treated as 82 // DefaultPublishSettings.BufferedByteLimit. Otherwise must be > 0. 83 // 84 // Note that this setting applies per partition. If BufferedByteLimit is being 85 // used to bound memory usage, keep in mind the number of partitions in the 86 // topic. 87 // 88 // Note that Pub/Sub Lite topics are provisioned a publishing throughput 89 // capacity, per partition, shared by all publisher clients. Setting a large 90 // buffer size can mitigate transient publish spikes. However, consistently 91 // attempting to publish messages at a much higher rate than the publishing 92 // throughput capacity can cause the buffers to overflow. For more 93 // information, see https://cloud.google.com/pubsub/lite/docs/topics. 94 BufferedByteLimit int 95 96 // Optional custom function that extracts an ordering key from a Message. The 97 // default implementation extracts the key from Message.OrderingKey. 98 KeyExtractor KeyExtractorFunc 99 100 // Optional custom function that transforms a pubsub.Message to a 101 // PubSubMessage API proto. 102 MessageTransformer PublishMessageTransformerFunc 103 104 // The polling interval to watch for topic partition count updates. 105 // Currently internal only and overridden in tests. 106 configPollPeriod time.Duration 107} 108 109// DefaultPublishSettings holds the default values for PublishSettings. 110var DefaultPublishSettings = PublishSettings{ 111 DelayThreshold: 10 * time.Millisecond, 112 CountThreshold: 100, 113 ByteThreshold: 1e6, 114 Timeout: 7 * 24 * time.Hour, 115 BufferedByteLimit: 1e10, 116} 117 118func (s *PublishSettings) toWireSettings() wire.PublishSettings { 119 wireSettings := wire.PublishSettings{ 120 DelayThreshold: DefaultPublishSettings.DelayThreshold, 121 CountThreshold: DefaultPublishSettings.CountThreshold, 122 ByteThreshold: DefaultPublishSettings.ByteThreshold, 123 Timeout: DefaultPublishSettings.Timeout, 124 BufferedByteLimit: DefaultPublishSettings.BufferedByteLimit, 125 ConfigPollPeriod: wire.DefaultPublishSettings.ConfigPollPeriod, 126 Framework: wire.FrameworkCloudPubSubShim, 127 } 128 // Negative values preserved, but will fail validation in wire package. 129 if s.DelayThreshold != 0 { 130 wireSettings.DelayThreshold = s.DelayThreshold 131 } 132 if s.CountThreshold != 0 { 133 wireSettings.CountThreshold = s.CountThreshold 134 } 135 if s.ByteThreshold != 0 { 136 wireSettings.ByteThreshold = s.ByteThreshold 137 } 138 if s.Timeout != 0 { 139 wireSettings.Timeout = s.Timeout 140 } 141 if s.BufferedByteLimit != 0 { 142 wireSettings.BufferedByteLimit = s.BufferedByteLimit 143 } 144 if s.configPollPeriod != 0 { 145 wireSettings.ConfigPollPeriod = s.configPollPeriod 146 } 147 return wireSettings 148} 149 150// NackHandler is invoked when pubsub.Message.Nack() is called. Pub/Sub Lite 151// does not have a concept of 'nack'. If the nack handler implementation returns 152// nil, the message is acknowledged. If an error is returned, the 153// SubscriberClient will consider this a fatal error and terminate. 154// 155// In Pub/Sub Lite, only a single subscriber for a given subscription is 156// connected to any partition at a time, and there is no other client that may 157// be able to handle messages. 158type NackHandler func(*pubsub.Message) error 159 160// ReceiveMessageTransformerFunc transforms a Pub/Sub Lite SequencedMessage API 161// proto to a pubsub.Message. The implementation must not set pubsub.Message.ID. 162// 163// If this returns an error, the SubscriberClient will consider this a fatal 164// error and terminate. 165type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) error 166 167// ReceiveSettings configure the SubscriberClient. Flow control settings 168// (MaxOutstandingMessages, MaxOutstandingBytes) apply per partition. 169// 170// A zero ReceiveSettings will result in values equivalent to 171// DefaultReceiveSettings. 172type ReceiveSettings struct { 173 // MaxOutstandingMessages is the maximum number of unacknowledged messages. 174 // If MaxOutstandingMessages is 0, it will be treated as 175 // DefaultReceiveSettings.MaxOutstandingMessages. Otherwise must be > 0. 176 MaxOutstandingMessages int 177 178 // MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged 179 // messages. If MaxOutstandingBytes is 0, it will be treated as 180 // DefaultReceiveSettings.MaxOutstandingBytes. Otherwise must be > 0. 181 // 182 // Note that this setting applies per partition. If MaxOutstandingBytes is 183 // being used to bound memory usage, keep in mind the number of partitions in 184 // the associated topic. 185 MaxOutstandingBytes int 186 187 // The maximum time that the client will attempt to open a subscribe stream 188 // to the server. If Timeout is 0, it will be treated as 189 // DefaultReceiveSettings.Timeout. Otherwise must be > 0. 190 // 191 // If your application has a low tolerance to backend unavailability, set 192 // Timeout to a lower duration to detect and handle. When the timeout is 193 // exceeded, the SubscriberClient will terminate with ErrBackendUnavailable 194 // and details of the last error that occurred while trying to reconnect to 195 // backends. 196 // 197 // It is not recommended to set Timeout below 2 minutes. 198 Timeout time.Duration 199 200 // The topic partition numbers (zero-indexed) to receive messages from. 201 // Values must be less than the number of partitions for the topic. If not 202 // specified, the SubscriberClient will use the partition assignment service 203 // to determine which partitions it should connect to. 204 Partitions []int 205 206 // Optional custom function to handle pubsub.Message.Nack() calls. If not set, 207 // the default behavior is to terminate the SubscriberClient. 208 NackHandler NackHandler 209 210 // Optional custom function that transforms a SequencedMessage API proto to a 211 // pubsub.Message. 212 MessageTransformer ReceiveMessageTransformerFunc 213} 214 215// DefaultReceiveSettings holds the default values for ReceiveSettings. 216var DefaultReceiveSettings = ReceiveSettings{ 217 MaxOutstandingMessages: 1000, 218 MaxOutstandingBytes: 1e9, 219 Timeout: 7 * 24 * time.Hour, 220} 221 222func (s *ReceiveSettings) toWireSettings() wire.ReceiveSettings { 223 wireSettings := wire.ReceiveSettings{ 224 MaxOutstandingMessages: DefaultReceiveSettings.MaxOutstandingMessages, 225 MaxOutstandingBytes: DefaultReceiveSettings.MaxOutstandingBytes, 226 Timeout: DefaultReceiveSettings.Timeout, 227 Partitions: s.Partitions, 228 Framework: wire.FrameworkCloudPubSubShim, 229 } 230 // Negative values preserved, but will fail validation in wire package. 231 if s.MaxOutstandingMessages != 0 { 232 wireSettings.MaxOutstandingMessages = s.MaxOutstandingMessages 233 } 234 if s.MaxOutstandingBytes != 0 { 235 wireSettings.MaxOutstandingBytes = s.MaxOutstandingBytes 236 } 237 if s.Timeout != 0 { 238 wireSettings.Timeout = s.Timeout 239 } 240 return wireSettings 241} 242