1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"context"
18	"encoding/base64"
19	"fmt"
20	"net/url"
21	"time"
22
23	"google.golang.org/api/option"
24	"google.golang.org/api/option/internaloption"
25	"google.golang.org/grpc"
26	"google.golang.org/grpc/codes"
27	"google.golang.org/grpc/keepalive"
28	"google.golang.org/grpc/metadata"
29	"google.golang.org/grpc/status"
30	"google.golang.org/protobuf/proto"
31	"google.golang.org/protobuf/types/known/structpb"
32
33	vkit "cloud.google.com/go/pubsublite/apiv1"
34	gax "github.com/googleapis/gax-go/v2"
35)
36
37// streamRetryer implements the retry policy for establishing gRPC stream
38// connections.
39type streamRetryer struct {
40	bo       gax.Backoff
41	deadline time.Time
42}
43
44func newStreamRetryer(timeout time.Duration) *streamRetryer {
45	return &streamRetryer{
46		bo: gax.Backoff{
47			Initial:    10 * time.Millisecond,
48			Max:        10 * time.Second,
49			Multiplier: 2,
50		},
51		deadline: time.Now().Add(timeout),
52	}
53}
54
55func (r *streamRetryer) RetrySend(err error) (backoff time.Duration, shouldRetry bool) {
56	if isRetryableSendError(err) {
57		return r.bo.Pause(), true
58	}
59	return 0, false
60}
61
62func (r *streamRetryer) RetryRecv(err error) (backoff time.Duration, shouldRetry bool) {
63	if isRetryableRecvError(err) {
64		return r.bo.Pause(), true
65	}
66	return 0, false
67}
68
69func (r *streamRetryer) ExceededDeadline() bool {
70	return time.Now().After(r.deadline)
71}
72
73func isRetryableSendCode(code codes.Code) bool {
74	switch code {
75	// Client-side errors that occur during grpc.ClientStream.SendMsg() have a
76	// smaller set of retryable codes.
77	case codes.DeadlineExceeded, codes.Unavailable:
78		return true
79	default:
80		return false
81	}
82}
83
84func isRetryableRecvCode(code codes.Code) bool {
85	switch code {
86	// Consistent with https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/ErrorCodes.java
87	case codes.Aborted, codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Unavailable, codes.Unknown:
88		return true
89	default:
90		return false
91	}
92}
93
94func isRetryableSendError(err error) bool {
95	return isRetryableStreamError(err, isRetryableSendCode)
96}
97
98func isRetryableRecvError(err error) bool {
99	return isRetryableStreamError(err, isRetryableRecvCode)
100}
101
102func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool {
103	s, ok := status.FromError(err)
104	if !ok {
105		// Includes io.EOF, normal stream close.
106		// Consistent with https://github.com/googleapis/google-cloud-go/blob/master/pubsub/service.go
107		return true
108	}
109	return isEligible(s.Code())
110}
111
112// retryableReadOnlyCallOption returns a call option that retries with backoff
113// for ResourceExhausted in addition to other default retryable codes for
114// Pub/Sub. Suitable for read-only operations which are subject to only QPS
115// quota limits.
116func retryableReadOnlyCallOption() gax.CallOption {
117	return gax.WithRetry(func() gax.Retryer {
118		return gax.OnCodes([]codes.Code{
119			codes.Aborted,
120			codes.DeadlineExceeded,
121			codes.Internal,
122			codes.ResourceExhausted,
123			codes.Unavailable,
124			codes.Unknown,
125		}, gax.Backoff{
126			Initial:    100 * time.Millisecond,
127			Max:        60 * time.Second,
128			Multiplier: 1.3,
129		})
130	})
131}
132
133const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443"
134
135func defaultClientOptions(region string) []option.ClientOption {
136	return []option.ClientOption{
137		internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint),
138		// Keep inactive connections alive.
139		option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
140			Time: 5 * time.Minute,
141		})),
142	}
143}
144
145type apiClient interface {
146	Close() error
147}
148
149type apiClients []apiClient
150
151func (ac apiClients) Close() (retErr error) {
152	for _, c := range ac {
153		if err := c.Close(); retErr == nil {
154			retErr = err
155		}
156	}
157	return
158}
159
160// NewAdminClient creates a new gapic AdminClient for a region.
161func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) {
162	options := append(defaultClientOptions(region), opts...)
163	return vkit.NewAdminClient(ctx, options...)
164}
165
166func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) {
167	options := append(defaultClientOptions(region), opts...)
168	return vkit.NewPublisherClient(ctx, options...)
169}
170
171func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) {
172	options := append(defaultClientOptions(region), opts...)
173	return vkit.NewSubscriberClient(ctx, options...)
174}
175
176func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) {
177	options := append(defaultClientOptions(region), opts...)
178	return vkit.NewCursorClient(ctx, options...)
179}
180
181func newPartitionAssignmentClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PartitionAssignmentClient, error) {
182	options := append(defaultClientOptions(region), opts...)
183	return vkit.NewPartitionAssignmentClient(ctx, options...)
184}
185
186const (
187	routingMetadataHeader    = "x-goog-request-params"
188	clientInfoMetadataHeader = "x-goog-pubsub-context"
189
190	languageKey     = "language"
191	languageValue   = "GOLANG"
192	frameworkKey    = "framework"
193	majorVersionKey = "major_version"
194	minorVersionKey = "minor_version"
195)
196
197func stringValue(str string) *structpb.Value {
198	return &structpb.Value{
199		Kind: &structpb.Value_StringValue{StringValue: str},
200	}
201}
202
203// pubsubMetadata stores key/value pairs that should be added to gRPC metadata.
204type pubsubMetadata map[string]string
205
206func newPubsubMetadata() pubsubMetadata {
207	return make(map[string]string)
208}
209
210func (pm pubsubMetadata) AddTopicRoutingMetadata(topic topicPartition) {
211	pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path))
212}
213
214func (pm pubsubMetadata) AddSubscriptionRoutingMetadata(subscription subscriptionPartition) {
215	pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path))
216}
217
218func (pm pubsubMetadata) AddClientInfo(framework FrameworkType) {
219	pm.doAddClientInfo(framework, libraryVersion)
220}
221
222func (pm pubsubMetadata) doAddClientInfo(framework FrameworkType, getVersion func() (version, bool)) {
223	s := &structpb.Struct{
224		Fields: make(map[string]*structpb.Value),
225	}
226	s.Fields[languageKey] = stringValue(languageValue)
227	if len(framework) > 0 {
228		s.Fields[frameworkKey] = stringValue(string(framework))
229	}
230	if version, ok := getVersion(); ok {
231		s.Fields[majorVersionKey] = stringValue(version.Major)
232		s.Fields[minorVersionKey] = stringValue(version.Minor)
233	}
234	if bytes, err := proto.Marshal(s); err == nil {
235		pm[clientInfoMetadataHeader] = base64.StdEncoding.EncodeToString(bytes)
236	}
237}
238
239func (pm pubsubMetadata) AddToContext(ctx context.Context) context.Context {
240	md, _ := metadata.FromOutgoingContext(ctx)
241	md = md.Copy()
242	for key, val := range pm {
243		md[key] = append(md[key], val)
244	}
245	return metadata.NewOutgoingContext(ctx, md)
246}
247