1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "errors" 18 "fmt" 19 "time" 20) 21 22const ( 23 // MaxPublishRequestCount is the maximum number of messages that can be 24 // batched in a single publish request. 25 MaxPublishRequestCount = 1000 26 27 // MaxPublishRequestBytes is the maximum allowed serialized size of a single 28 // publish request (containing a batch of messages) in bytes. Must be lower 29 // than the gRPC limit of 4 MiB. 30 MaxPublishRequestBytes int = 3.5 * 1024 * 1024 31) 32 33// FrameworkType is the user-facing API for Cloud Pub/Sub Lite. 34type FrameworkType string 35 36// FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub. 37const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM" 38 39// PublishSettings control the batching of published messages. These settings 40// apply per partition. 41type PublishSettings struct { 42 // Publish a non-empty batch after this delay has passed. Must be > 0. 43 DelayThreshold time.Duration 44 45 // Publish a batch when it has this many messages. Must be > 0. The maximum is 46 // MaxPublishRequestCount. 47 CountThreshold int 48 49 // Publish a batch when its size in bytes reaches this value. Must be > 0. The 50 // maximum is MaxPublishRequestBytes. 51 ByteThreshold int 52 53 // The maximum time that the client will attempt to establish a publish stream 54 // connection to the server. Must be > 0. 55 // 56 // The timeout is exceeded, the publisher will terminate with the last error 57 // that occurred while trying to reconnect. Note that if the timeout duration 58 // is long, ErrOverflow may occur first. 59 Timeout time.Duration 60 61 // The maximum number of bytes that the publisher will keep in memory before 62 // returning ErrOverflow. Must be > 0. 63 // 64 // Note that Pub/Sub Lite topics are provisioned a publishing throughput 65 // capacity, per partition, shared by all publisher clients. Setting a large 66 // buffer size can mitigate transient publish spikes. However, consistently 67 // attempting to publish messages at a much higher rate than the publishing 68 // throughput capacity can cause the buffers to overflow. For more 69 // information, see https://cloud.google.com/pubsub/lite/docs/topics. 70 BufferedByteLimit int 71 72 // The polling interval to watch for topic partition count updates. Set to 0 73 // to disable polling if the number of partitions will never update. 74 ConfigPollPeriod time.Duration 75 76 // The user-facing API type. 77 Framework FrameworkType 78} 79 80// DefaultPublishSettings holds the default values for PublishSettings. 81var DefaultPublishSettings = PublishSettings{ 82 DelayThreshold: 10 * time.Millisecond, 83 CountThreshold: 100, 84 ByteThreshold: 1e6, 85 Timeout: 7 * 24 * time.Hour, // 1 week 86 // By default set to a high limit that is not likely to occur, but prevents 87 // OOM errors in clients. 88 BufferedByteLimit: 1 << 30, // 1 GiB 89 ConfigPollPeriod: 10 * time.Minute, 90} 91 92func validatePublishSettings(settings PublishSettings) error { 93 if settings.DelayThreshold <= 0 { 94 return errors.New("pubsublite: invalid publish settings. DelayThreshold duration must be > 0") 95 } 96 if settings.Timeout <= 0 { 97 return errors.New("pubsublite: invalid publish settings. Timeout duration must be > 0") 98 } 99 if settings.CountThreshold <= 0 { 100 return errors.New("pubsublite: invalid publish settings. CountThreshold must be > 0") 101 } 102 if settings.CountThreshold > MaxPublishRequestCount { 103 return fmt.Errorf("pubsublite: invalid publish settings. Maximum CountThreshold is MaxPublishRequestCount (%d)", MaxPublishRequestCount) 104 } 105 if settings.ByteThreshold <= 0 { 106 return errors.New("pubsublite: invalid publish settings. ByteThreshold must be > 0") 107 } 108 if settings.ByteThreshold > MaxPublishRequestBytes { 109 return fmt.Errorf("pubsublite: invalid publish settings. Maximum ByteThreshold is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes) 110 } 111 if settings.BufferedByteLimit <= 0 { 112 return errors.New("pubsublite: invalid publish settings. BufferedByteLimit must be > 0") 113 } 114 return nil 115} 116 117// ReceiveSettings control the receiving of messages. These settings apply 118// per partition. 119type ReceiveSettings struct { 120 // MaxOutstandingMessages is the maximum number of unacknowledged messages. 121 // Must be > 0. 122 MaxOutstandingMessages int 123 124 // MaxOutstandingBytes is the maximum size (in quota bytes) of unacknowledged 125 // messages. Must be > 0. 126 MaxOutstandingBytes int 127 128 // The maximum time that the client will attempt to establish a subscribe 129 // stream connection to the server. Must be > 0. 130 // 131 // The timeout is exceeded, the subscriber will terminate with the last error 132 // that occurred while trying to reconnect. 133 Timeout time.Duration 134 135 // The topic partition numbers (zero-indexed) to receive messages from. 136 // Values must be less than the number of partitions for the topic. If not 137 // specified, the client will use the partition assignment service to 138 // determine which partitions it should connect to. 139 Partitions []int 140 141 // The user-facing API type. 142 Framework FrameworkType 143} 144 145// DefaultReceiveSettings holds the default values for ReceiveSettings. 146var DefaultReceiveSettings = ReceiveSettings{ 147 MaxOutstandingMessages: 1000, 148 MaxOutstandingBytes: 1e9, 149 Timeout: 7 * 24 * time.Hour, // 1 week 150} 151 152func validateReceiveSettings(settings ReceiveSettings) error { 153 if settings.MaxOutstandingMessages <= 0 { 154 return errors.New("pubsublite: invalid receive settings. MaxOutstandingMessages must be > 0") 155 } 156 if settings.MaxOutstandingBytes <= 0 { 157 return errors.New("pubsublite: invalid receive settings. MaxOutstandingBytes must be > 0") 158 } 159 if settings.Timeout <= 0 { 160 return errors.New("pubsublite: invalid receive settings. Timeout duration must be > 0") 161 } 162 if len(settings.Partitions) > 0 { 163 var void struct{} 164 partitionMap := make(map[int]struct{}) 165 for _, p := range settings.Partitions { 166 if p < 0 { 167 return fmt.Errorf("pubsublite: invalid partition number %d in receive settings. Partition numbers are zero-indexed", p) 168 } 169 if _, exists := partitionMap[p]; exists { 170 return fmt.Errorf("pubsublite: invalid receive settings. Duplicate partition number %d", p) 171 } 172 partitionMap[p] = void 173 } 174 } 175 return nil 176} 177