1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "fmt" 19 "math" 20 "strings" 21 "time" 22 23 gax "github.com/googleapis/gax-go/v2" 24 pb "google.golang.org/genproto/googleapis/pubsub/v1" 25 "google.golang.org/grpc/codes" 26 "google.golang.org/grpc/status" 27) 28 29// maxPayload is the maximum number of bytes to devote to the 30// encoded AcknowledgementRequest / ModifyAckDeadline proto message. 31// 32// With gRPC there is no way for the client to know the server's max message size (it is 33// configurable on the server). We know from experience that it 34// it 512K. 35const ( 36 maxPayload = 512 * 1024 37 maxSendRecvBytes = 20 * 1024 * 1024 // 20M 38) 39 40func convertMessages(rms []*pb.ReceivedMessage) ([]*Message, error) { 41 msgs := make([]*Message, 0, len(rms)) 42 for i, m := range rms { 43 msg, err := toMessage(m) 44 if err != nil { 45 return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m) 46 } 47 msgs = append(msgs, msg) 48 } 49 return msgs, nil 50} 51 52func trunc32(i int64) int32 { 53 if i > math.MaxInt32 { 54 i = math.MaxInt32 55 } 56 return int32(i) 57} 58 59type defaultRetryer struct { 60 bo gax.Backoff 61} 62 63// Logic originally from 64// https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java 65func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { 66 s, ok := status.FromError(err) 67 if !ok { // includes io.EOF, normal stream close, which causes us to reopen 68 return r.bo.Pause(), true 69 } 70 switch s.Code() { 71 case codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Aborted: 72 return r.bo.Pause(), true 73 case codes.Unavailable: 74 c := strings.Contains(s.Message(), "Server shutdownNow invoked") 75 if !c { 76 return r.bo.Pause(), true 77 } 78 return 0, false 79 default: 80 return 0, false 81 } 82} 83 84type streamingPullRetryer struct { 85 defaultRetryer gax.Retryer 86} 87 88// Does not retry ResourceExhausted. See: https://github.com/GoogleCloudPlatform/google-cloud-go/issues/1166#issuecomment-443744705 89func (r *streamingPullRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) { 90 s, ok := status.FromError(err) 91 if !ok { // call defaultRetryer so that its backoff can be used 92 return r.defaultRetryer.Retry(err) 93 } 94 switch s.Code() { 95 case codes.ResourceExhausted: 96 return 0, false 97 default: 98 return r.defaultRetryer.Retry(err) 99 } 100} 101