1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"fmt"
19	"math"
20	"strings"
21	"time"
22
23	gax "github.com/googleapis/gax-go/v2"
24	pb "google.golang.org/genproto/googleapis/pubsub/v1"
25	"google.golang.org/grpc/codes"
26	"google.golang.org/grpc/status"
27)
28
29// maxPayload is the maximum number of bytes to devote to the
30// encoded AcknowledgementRequest / ModifyAckDeadline proto message.
31//
32// With gRPC there is no way for the client to know the server's max message size (it is
33// configurable on the server). We know from experience that it
34// it 512K.
35const (
36	maxPayload       = 512 * 1024
37	maxSendRecvBytes = 20 * 1024 * 1024 // 20M
38)
39
40func convertMessages(rms []*pb.ReceivedMessage) ([]*Message, error) {
41	msgs := make([]*Message, 0, len(rms))
42	for i, m := range rms {
43		msg, err := toMessage(m)
44		if err != nil {
45			return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m)
46		}
47		msgs = append(msgs, msg)
48	}
49	return msgs, nil
50}
51
52func trunc32(i int64) int32 {
53	if i > math.MaxInt32 {
54		i = math.MaxInt32
55	}
56	return int32(i)
57}
58
59type defaultRetryer struct {
60	bo gax.Backoff
61}
62
63// Logic originally from
64// https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java
65func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
66	s, ok := status.FromError(err)
67	if !ok { // includes io.EOF, normal stream close, which causes us to reopen
68		return r.bo.Pause(), true
69	}
70	switch s.Code() {
71	case codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Aborted:
72		return r.bo.Pause(), true
73	case codes.Unavailable:
74		c := strings.Contains(s.Message(), "Server shutdownNow invoked")
75		if !c {
76			return r.bo.Pause(), true
77		}
78		return 0, false
79	default:
80		return 0, false
81	}
82}
83
84type streamingPullRetryer struct {
85	defaultRetryer gax.Retryer
86}
87
88// Does not retry ResourceExhausted. See: https://github.com/GoogleCloudPlatform/google-cloud-go/issues/1166#issuecomment-443744705
89func (r *streamingPullRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
90	s, ok := status.FromError(err)
91	if !ok { // call defaultRetryer so that its backoff can be used
92		return r.defaultRetryer.Retry(err)
93	}
94	switch s.Code() {
95	case codes.ResourceExhausted:
96		return 0, false
97	default:
98		return r.defaultRetryer.Retry(err)
99	}
100}
101