1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "context" 18 "encoding/base64" 19 "fmt" 20 "net/url" 21 "time" 22 23 "google.golang.org/api/option" 24 "google.golang.org/api/option/internaloption" 25 "google.golang.org/grpc" 26 "google.golang.org/grpc/codes" 27 "google.golang.org/grpc/keepalive" 28 "google.golang.org/grpc/metadata" 29 "google.golang.org/grpc/status" 30 "google.golang.org/protobuf/proto" 31 "google.golang.org/protobuf/types/known/structpb" 32 33 vkit "cloud.google.com/go/pubsublite/apiv1" 34 gax "github.com/googleapis/gax-go/v2" 35) 36 37// streamRetryer implements the retry policy for establishing gRPC stream 38// connections. 39type streamRetryer struct { 40 bo gax.Backoff 41 deadline time.Time 42} 43 44func newStreamRetryer(timeout time.Duration) *streamRetryer { 45 return &streamRetryer{ 46 bo: gax.Backoff{ 47 Initial: 10 * time.Millisecond, 48 Max: 10 * time.Second, 49 Multiplier: 2, 50 }, 51 deadline: time.Now().Add(timeout), 52 } 53} 54 55func (r *streamRetryer) RetrySend(err error) (backoff time.Duration, shouldRetry bool) { 56 if isRetryableSendError(err) { 57 return r.bo.Pause(), true 58 } 59 return 0, false 60} 61 62func (r *streamRetryer) RetryRecv(err error) (backoff time.Duration, shouldRetry bool) { 63 if isRetryableRecvError(err) { 64 return r.bo.Pause(), true 65 } 66 return 0, false 67} 68 69func (r *streamRetryer) ExceededDeadline() bool { 70 return time.Now().After(r.deadline) 71} 72 73func isRetryableSendCode(code codes.Code) bool { 74 switch code { 75 // Client-side errors that occur during grpc.ClientStream.SendMsg() have a 76 // smaller set of retryable codes. 77 case codes.DeadlineExceeded, codes.Unavailable: 78 return true 79 default: 80 return false 81 } 82} 83 84func isRetryableRecvCode(code codes.Code) bool { 85 switch code { 86 // Consistent with https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/ErrorCodes.java 87 case codes.Aborted, codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Unavailable, codes.Unknown: 88 return true 89 default: 90 return false 91 } 92} 93 94func isRetryableSendError(err error) bool { 95 return isRetryableStreamError(err, isRetryableSendCode) 96} 97 98func isRetryableRecvError(err error) bool { 99 return isRetryableStreamError(err, isRetryableRecvCode) 100} 101 102func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool { 103 s, ok := status.FromError(err) 104 if !ok { 105 // Includes io.EOF, normal stream close. 106 // Consistent with https://github.com/googleapis/google-cloud-go/blob/master/pubsub/service.go 107 return true 108 } 109 return isEligible(s.Code()) 110} 111 112// retryableReadOnlyCallOption returns a call option that retries with backoff 113// for ResourceExhausted in addition to other default retryable codes for 114// Pub/Sub. Suitable for read-only operations which are subject to only QPS 115// quota limits. 116func retryableReadOnlyCallOption() gax.CallOption { 117 return gax.WithRetry(func() gax.Retryer { 118 return gax.OnCodes([]codes.Code{ 119 codes.Aborted, 120 codes.DeadlineExceeded, 121 codes.Internal, 122 codes.ResourceExhausted, 123 codes.Unavailable, 124 codes.Unknown, 125 }, gax.Backoff{ 126 Initial: 100 * time.Millisecond, 127 Max: 60 * time.Second, 128 Multiplier: 1.3, 129 }) 130 }) 131} 132 133const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443" 134 135func defaultClientOptions(region string) []option.ClientOption { 136 return []option.ClientOption{ 137 internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint), 138 // Keep inactive connections alive. 139 option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{ 140 Time: 5 * time.Minute, 141 })), 142 } 143} 144 145type apiClient interface { 146 Close() error 147} 148 149type apiClients []apiClient 150 151func (ac apiClients) Close() (retErr error) { 152 for _, c := range ac { 153 if err := c.Close(); retErr == nil { 154 retErr = err 155 } 156 } 157 return 158} 159 160// NewAdminClient creates a new gapic AdminClient for a region. 161func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) { 162 options := append(defaultClientOptions(region), opts...) 163 return vkit.NewAdminClient(ctx, options...) 164} 165 166func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) { 167 options := append(defaultClientOptions(region), opts...) 168 return vkit.NewPublisherClient(ctx, options...) 169} 170 171func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) { 172 options := append(defaultClientOptions(region), opts...) 173 return vkit.NewSubscriberClient(ctx, options...) 174} 175 176func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) { 177 options := append(defaultClientOptions(region), opts...) 178 return vkit.NewCursorClient(ctx, options...) 179} 180 181func newPartitionAssignmentClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PartitionAssignmentClient, error) { 182 options := append(defaultClientOptions(region), opts...) 183 return vkit.NewPartitionAssignmentClient(ctx, options...) 184} 185 186const ( 187 routingMetadataHeader = "x-goog-request-params" 188 clientInfoMetadataHeader = "x-goog-pubsub-context" 189 190 languageKey = "language" 191 languageValue = "GOLANG" 192 frameworkKey = "framework" 193 majorVersionKey = "major_version" 194 minorVersionKey = "minor_version" 195) 196 197func stringValue(str string) *structpb.Value { 198 return &structpb.Value{ 199 Kind: &structpb.Value_StringValue{StringValue: str}, 200 } 201} 202 203// pubsubMetadata stores key/value pairs that should be added to gRPC metadata. 204type pubsubMetadata map[string]string 205 206func newPubsubMetadata() pubsubMetadata { 207 return make(map[string]string) 208} 209 210func (pm pubsubMetadata) AddTopicRoutingMetadata(topic topicPartition) { 211 pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path)) 212} 213 214func (pm pubsubMetadata) AddSubscriptionRoutingMetadata(subscription subscriptionPartition) { 215 pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path)) 216} 217 218func (pm pubsubMetadata) AddClientInfo(framework FrameworkType) { 219 pm.doAddClientInfo(framework, libraryVersion) 220} 221 222func (pm pubsubMetadata) doAddClientInfo(framework FrameworkType, getVersion func() (version, bool)) { 223 s := &structpb.Struct{ 224 Fields: make(map[string]*structpb.Value), 225 } 226 s.Fields[languageKey] = stringValue(languageValue) 227 if len(framework) > 0 { 228 s.Fields[frameworkKey] = stringValue(string(framework)) 229 } 230 if version, ok := getVersion(); ok { 231 s.Fields[majorVersionKey] = stringValue(version.Major) 232 s.Fields[minorVersionKey] = stringValue(version.Minor) 233 } 234 if bytes, err := proto.Marshal(s); err == nil { 235 pm[clientInfoMetadataHeader] = base64.StdEncoding.EncodeToString(bytes) 236 } 237} 238 239func (pm pubsubMetadata) AddToContext(ctx context.Context) context.Context { 240 md, _ := metadata.FromOutgoingContext(ctx) 241 md = md.Copy() 242 for key, val := range pm { 243 md[key] = append(md[key], val) 244 } 245 return metadata.NewOutgoingContext(ctx, md) 246} 247