1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"math"
19	"strings"
20	"time"
21
22	gax "github.com/googleapis/gax-go/v2"
23	"google.golang.org/grpc/codes"
24	"google.golang.org/grpc/status"
25)
26
27// maxPayload is the maximum number of bytes to devote to the
28// encoded AcknowledgementRequest / ModifyAckDeadline proto message.
29//
30// With gRPC there is no way for the client to know the server's max message size (it is
31// configurable on the server). We know from experience that it
32// it 512K.
33const (
34	maxPayload       = 512 * 1024
35	maxSendRecvBytes = 20 * 1024 * 1024 // 20M
36)
37
38func trunc32(i int64) int32 {
39	if i > math.MaxInt32 {
40		i = math.MaxInt32
41	}
42	return int32(i)
43}
44
45type defaultRetryer struct {
46	bo gax.Backoff
47}
48
49// Logic originally from
50// https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java
51func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
52	s, ok := status.FromError(err)
53	if !ok { // includes io.EOF, normal stream close, which causes us to reopen
54		return r.bo.Pause(), true
55	}
56	switch s.Code() {
57	case codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Aborted:
58		return r.bo.Pause(), true
59	case codes.Unavailable:
60		c := strings.Contains(s.Message(), "Server shutdownNow invoked")
61		if !c {
62			return r.bo.Pause(), true
63		}
64		return 0, false
65	default:
66		return 0, false
67	}
68}
69
70type streamingPullRetryer struct {
71	defaultRetryer gax.Retryer
72}
73
74// Does not retry ResourceExhausted. See: https://github.com/GoogleCloudPlatform/google-cloud-go/issues/1166#issuecomment-443744705
75func (r *streamingPullRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
76	s, ok := status.FromError(err)
77	if !ok { // call defaultRetryer so that its backoff can be used
78		return r.defaultRetryer.Retry(err)
79	}
80	switch s.Code() {
81	case codes.ResourceExhausted:
82		return 0, false
83	default:
84		return r.defaultRetryer.Retry(err)
85	}
86}
87