1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "math" 19 "strings" 20 "time" 21 22 gax "github.com/googleapis/gax-go/v2" 23 "google.golang.org/grpc/codes" 24 "google.golang.org/grpc/status" 25) 26 27// maxPayload is the maximum number of bytes to devote to the 28// encoded AcknowledgementRequest / ModifyAckDeadline proto message. 29// 30// With gRPC there is no way for the client to know the server's max message size (it is 31// configurable on the server). We know from experience that it 32// it 512K. 33const ( 34 maxPayload = 512 * 1024 35 maxSendRecvBytes = 20 * 1024 * 1024 // 20M 36) 37 38func trunc32(i int64) int32 { 39 if i > math.MaxInt32 { 40 i = math.MaxInt32 41 } 42 return int32(i) 43} 44 45type defaultRetryer struct { 46 bo gax.Backoff 47} 48 49// Logic originally from 50// https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java 51func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { 52 s, ok := status.FromError(err) 53 if !ok { // includes io.EOF, normal stream close, which causes us to reopen 54 return r.bo.Pause(), true 55 } 56 switch s.Code() { 57 case codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Aborted: 58 return r.bo.Pause(), true 59 case codes.Unavailable: 60 c := strings.Contains(s.Message(), "Server shutdownNow invoked") 61 if !c { 62 return r.bo.Pause(), true 63 } 64 return 0, false 65 default: 66 return 0, false 67 } 68} 69 70type streamingPullRetryer struct { 71 defaultRetryer gax.Retryer 72} 73 74// Does not retry ResourceExhausted. See: https://github.com/GoogleCloudPlatform/google-cloud-go/issues/1166#issuecomment-443744705 75func (r *streamingPullRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { 76 s, ok := status.FromError(err) 77 if !ok { // call defaultRetryer so that its backoff can be used 78 return r.defaultRetryer.Retry(err) 79 } 80 switch s.Code() { 81 case codes.ResourceExhausted: 82 return 0, false 83 default: 84 return r.defaultRetryer.Retry(err) 85 } 86} 87