1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"fmt"
19	"math"
20	"strings"
21
22	pb "google.golang.org/genproto/googleapis/pubsub/v1"
23	"google.golang.org/grpc/codes"
24	"google.golang.org/grpc/status"
25)
26
27// maxPayload is the maximum number of bytes to devote to actual ids in
28// acknowledgement or modifyAckDeadline requests. A serialized
29// AcknowledgeRequest proto has a small constant overhead, plus the size of the
30// subscription name, plus 3 bytes per ID (a tag byte and two size bytes). A
31// ModifyAckDeadlineRequest has an additional few bytes for the deadline. We
32// don't know the subscription name here, so we just assume the size exclusive
33// of ids is 100 bytes.
34//
35// With gRPC there is no way for the client to know the server's max message size (it is
36// configurable on the server). We know from experience that it
37// it 512K.
38const (
39	maxPayload       = 512 * 1024
40	reqFixedOverhead = 100
41	overheadPerID    = 3
42	maxSendRecvBytes = 20 * 1024 * 1024 // 20M
43)
44
45func convertMessages(rms []*pb.ReceivedMessage) ([]*Message, error) {
46	msgs := make([]*Message, 0, len(rms))
47	for i, m := range rms {
48		msg, err := toMessage(m)
49		if err != nil {
50			return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m)
51		}
52		msgs = append(msgs, msg)
53	}
54	return msgs, nil
55}
56
57func trunc32(i int64) int32 {
58	if i > math.MaxInt32 {
59		i = math.MaxInt32
60	}
61	return int32(i)
62}
63
64// Logic from https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java
65func isRetryable(err error) bool {
66	s, ok := status.FromError(err)
67	if !ok { // includes io.EOF, normal stream close, which causes us to reopen
68		return true
69	}
70	switch s.Code() {
71	case codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted:
72		return true
73	case codes.Unavailable:
74		return !strings.Contains(s.Message(), "Server shutdownNow invoked")
75	default:
76		return false
77	}
78}
79