1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"context"
18	"encoding/base64"
19	"fmt"
20	"net/url"
21	"time"
22
23	"google.golang.org/api/option"
24	"google.golang.org/api/option/internaloption"
25	"google.golang.org/grpc/codes"
26	"google.golang.org/grpc/metadata"
27	"google.golang.org/grpc/status"
28	"google.golang.org/protobuf/proto"
29	"google.golang.org/protobuf/types/known/structpb"
30
31	vkit "cloud.google.com/go/pubsublite/apiv1"
32	gax "github.com/googleapis/gax-go/v2"
33)
34
35// streamRetryer implements the retry policy for establishing gRPC stream
36// connections.
37type streamRetryer struct {
38	bo       gax.Backoff
39	deadline time.Time
40}
41
42func newStreamRetryer(timeout time.Duration) *streamRetryer {
43	return &streamRetryer{
44		bo: gax.Backoff{
45			Initial:    10 * time.Millisecond,
46			Max:        10 * time.Second,
47			Multiplier: 2,
48		},
49		deadline: time.Now().Add(timeout),
50	}
51}
52
53func (r *streamRetryer) RetrySend(err error) (time.Duration, bool) {
54	if time.Now().After(r.deadline) {
55		return 0, false
56	}
57	if isRetryableSendError(err) {
58		return r.bo.Pause(), true
59	}
60	return 0, false
61}
62
63func (r *streamRetryer) RetryRecv(err error) (time.Duration, bool) {
64	if time.Now().After(r.deadline) {
65		return 0, false
66	}
67	if isRetryableRecvError(err) {
68		return r.bo.Pause(), true
69	}
70	return 0, false
71}
72
73func isRetryableSendCode(code codes.Code) bool {
74	switch code {
75	// Client-side errors that occur during grpc.ClientStream.SendMsg() have a
76	// smaller set of retryable codes.
77	case codes.DeadlineExceeded, codes.Unavailable:
78		return true
79	default:
80		return false
81	}
82}
83
84func isRetryableRecvCode(code codes.Code) bool {
85	switch code {
86	// Consistent with https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/ErrorCodes.java
87	case codes.Aborted, codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Unavailable, codes.Unknown:
88		return true
89	default:
90		return false
91	}
92}
93
94func isRetryableSendError(err error) bool {
95	return isRetryableStreamError(err, isRetryableSendCode)
96}
97
98func isRetryableRecvError(err error) bool {
99	return isRetryableStreamError(err, isRetryableRecvCode)
100}
101
102func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool {
103	s, ok := status.FromError(err)
104	if !ok {
105		// Includes io.EOF, normal stream close.
106		// Consistent with https://github.com/googleapis/google-cloud-go/blob/master/pubsub/service.go
107		return true
108	}
109	return isEligible(s.Code())
110}
111
112// retryableReadOnlyCallOption returns a call option that retries with backoff
113// for ResourceExhausted in addition to other default retryable codes for
114// Pub/Sub. Suitable for read-only operations which are subject to only QPS
115// quota limits.
116func retryableReadOnlyCallOption() gax.CallOption {
117	return gax.WithRetry(func() gax.Retryer {
118		return gax.OnCodes([]codes.Code{
119			codes.Aborted,
120			codes.DeadlineExceeded,
121			codes.Internal,
122			codes.ResourceExhausted,
123			codes.Unavailable,
124			codes.Unknown,
125		}, gax.Backoff{
126			Initial:    100 * time.Millisecond,
127			Max:        60 * time.Second,
128			Multiplier: 1.3,
129		})
130	})
131}
132
133const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443"
134
135func defaultClientOptions(region string) []option.ClientOption {
136	return []option.ClientOption{
137		internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint),
138	}
139}
140
141type apiClient interface {
142	Close() error
143}
144
145type apiClients []apiClient
146
147func (ac apiClients) Close() (retErr error) {
148	for _, c := range ac {
149		if err := c.Close(); retErr == nil {
150			retErr = err
151		}
152	}
153	return
154}
155
156// NewAdminClient creates a new gapic AdminClient for a region.
157func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) {
158	options := append(defaultClientOptions(region), opts...)
159	return vkit.NewAdminClient(ctx, options...)
160}
161
162func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) {
163	options := append(defaultClientOptions(region), opts...)
164	return vkit.NewPublisherClient(ctx, options...)
165}
166
167func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) {
168	options := append(defaultClientOptions(region), opts...)
169	return vkit.NewSubscriberClient(ctx, options...)
170}
171
172func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) {
173	options := append(defaultClientOptions(region), opts...)
174	return vkit.NewCursorClient(ctx, options...)
175}
176
177func newPartitionAssignmentClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PartitionAssignmentClient, error) {
178	options := append(defaultClientOptions(region), opts...)
179	return vkit.NewPartitionAssignmentClient(ctx, options...)
180}
181
182const (
183	routingMetadataHeader    = "x-goog-request-params"
184	clientInfoMetadataHeader = "x-goog-pubsub-context"
185
186	languageKey     = "language"
187	languageValue   = "GOLANG"
188	frameworkKey    = "framework"
189	majorVersionKey = "major_version"
190	minorVersionKey = "minor_version"
191)
192
193func stringValue(str string) *structpb.Value {
194	return &structpb.Value{
195		Kind: &structpb.Value_StringValue{StringValue: str},
196	}
197}
198
199// pubsubMetadata stores key/value pairs that should be added to gRPC metadata.
200type pubsubMetadata map[string]string
201
202func newPubsubMetadata() pubsubMetadata {
203	return make(map[string]string)
204}
205
206func (pm pubsubMetadata) AddTopicRoutingMetadata(topic topicPartition) {
207	pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path))
208}
209
210func (pm pubsubMetadata) AddSubscriptionRoutingMetadata(subscription subscriptionPartition) {
211	pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path))
212}
213
214func (pm pubsubMetadata) AddClientInfo(framework FrameworkType) {
215	pm.doAddClientInfo(framework, libraryVersion)
216}
217
218func (pm pubsubMetadata) doAddClientInfo(framework FrameworkType, getVersion func() (version, bool)) {
219	s := &structpb.Struct{
220		Fields: make(map[string]*structpb.Value),
221	}
222	s.Fields[languageKey] = stringValue(languageValue)
223	if len(framework) > 0 {
224		s.Fields[frameworkKey] = stringValue(string(framework))
225	}
226	if version, ok := getVersion(); ok {
227		s.Fields[majorVersionKey] = stringValue(version.Major)
228		s.Fields[minorVersionKey] = stringValue(version.Minor)
229	}
230	if bytes, err := proto.Marshal(s); err == nil {
231		pm[clientInfoMetadataHeader] = base64.StdEncoding.EncodeToString(bytes)
232	}
233}
234
235func (pm pubsubMetadata) AddToContext(ctx context.Context) context.Context {
236	md, _ := metadata.FromOutgoingContext(ctx)
237	md = md.Copy()
238	for key, val := range pm {
239		md[key] = append(md[key], val)
240	}
241	return metadata.NewOutgoingContext(ctx, md)
242}
243