1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "context" 18 "encoding/base64" 19 "fmt" 20 "net/url" 21 "time" 22 23 "google.golang.org/api/option" 24 "google.golang.org/api/option/internaloption" 25 "google.golang.org/grpc/codes" 26 "google.golang.org/grpc/metadata" 27 "google.golang.org/grpc/status" 28 "google.golang.org/protobuf/proto" 29 "google.golang.org/protobuf/types/known/structpb" 30 31 vkit "cloud.google.com/go/pubsublite/apiv1" 32 gax "github.com/googleapis/gax-go/v2" 33) 34 35// streamRetryer implements the retry policy for establishing gRPC stream 36// connections. 37type streamRetryer struct { 38 bo gax.Backoff 39 deadline time.Time 40} 41 42func newStreamRetryer(timeout time.Duration) *streamRetryer { 43 return &streamRetryer{ 44 bo: gax.Backoff{ 45 Initial: 10 * time.Millisecond, 46 Max: 10 * time.Second, 47 Multiplier: 2, 48 }, 49 deadline: time.Now().Add(timeout), 50 } 51} 52 53func (r *streamRetryer) RetrySend(err error) (time.Duration, bool) { 54 if time.Now().After(r.deadline) { 55 return 0, false 56 } 57 if isRetryableSendError(err) { 58 return r.bo.Pause(), true 59 } 60 return 0, false 61} 62 63func (r *streamRetryer) RetryRecv(err error) (time.Duration, bool) { 64 if time.Now().After(r.deadline) { 65 return 0, false 66 } 67 if isRetryableRecvError(err) { 68 return r.bo.Pause(), true 69 } 70 return 0, false 71} 72 73func isRetryableSendCode(code codes.Code) bool { 74 switch code { 75 // Client-side errors that occur during grpc.ClientStream.SendMsg() have a 76 // smaller set of retryable codes. 77 case codes.DeadlineExceeded, codes.Unavailable: 78 return true 79 default: 80 return false 81 } 82} 83 84func isRetryableRecvCode(code codes.Code) bool { 85 switch code { 86 // Consistent with https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/ErrorCodes.java 87 case codes.Aborted, codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Unavailable, codes.Unknown: 88 return true 89 default: 90 return false 91 } 92} 93 94func isRetryableSendError(err error) bool { 95 return isRetryableStreamError(err, isRetryableSendCode) 96} 97 98func isRetryableRecvError(err error) bool { 99 return isRetryableStreamError(err, isRetryableRecvCode) 100} 101 102func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool { 103 s, ok := status.FromError(err) 104 if !ok { 105 // Includes io.EOF, normal stream close. 106 // Consistent with https://github.com/googleapis/google-cloud-go/blob/master/pubsub/service.go 107 return true 108 } 109 return isEligible(s.Code()) 110} 111 112// retryableReadOnlyCallOption returns a call option that retries with backoff 113// for ResourceExhausted in addition to other default retryable codes for 114// Pub/Sub. Suitable for read-only operations which are subject to only QPS 115// quota limits. 116func retryableReadOnlyCallOption() gax.CallOption { 117 return gax.WithRetry(func() gax.Retryer { 118 return gax.OnCodes([]codes.Code{ 119 codes.Aborted, 120 codes.DeadlineExceeded, 121 codes.Internal, 122 codes.ResourceExhausted, 123 codes.Unavailable, 124 codes.Unknown, 125 }, gax.Backoff{ 126 Initial: 100 * time.Millisecond, 127 Max: 60 * time.Second, 128 Multiplier: 1.3, 129 }) 130 }) 131} 132 133const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443" 134 135func defaultClientOptions(region string) []option.ClientOption { 136 return []option.ClientOption{ 137 internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint), 138 } 139} 140 141type apiClient interface { 142 Close() error 143} 144 145type apiClients []apiClient 146 147func (ac apiClients) Close() (retErr error) { 148 for _, c := range ac { 149 if err := c.Close(); retErr == nil { 150 retErr = err 151 } 152 } 153 return 154} 155 156// NewAdminClient creates a new gapic AdminClient for a region. 157func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) { 158 options := append(defaultClientOptions(region), opts...) 159 return vkit.NewAdminClient(ctx, options...) 160} 161 162func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) { 163 options := append(defaultClientOptions(region), opts...) 164 return vkit.NewPublisherClient(ctx, options...) 165} 166 167func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) { 168 options := append(defaultClientOptions(region), opts...) 169 return vkit.NewSubscriberClient(ctx, options...) 170} 171 172func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) { 173 options := append(defaultClientOptions(region), opts...) 174 return vkit.NewCursorClient(ctx, options...) 175} 176 177func newPartitionAssignmentClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PartitionAssignmentClient, error) { 178 options := append(defaultClientOptions(region), opts...) 179 return vkit.NewPartitionAssignmentClient(ctx, options...) 180} 181 182const ( 183 routingMetadataHeader = "x-goog-request-params" 184 clientInfoMetadataHeader = "x-goog-pubsub-context" 185 186 languageKey = "language" 187 languageValue = "GOLANG" 188 frameworkKey = "framework" 189 majorVersionKey = "major_version" 190 minorVersionKey = "minor_version" 191) 192 193func stringValue(str string) *structpb.Value { 194 return &structpb.Value{ 195 Kind: &structpb.Value_StringValue{StringValue: str}, 196 } 197} 198 199// pubsubMetadata stores key/value pairs that should be added to gRPC metadata. 200type pubsubMetadata map[string]string 201 202func newPubsubMetadata() pubsubMetadata { 203 return make(map[string]string) 204} 205 206func (pm pubsubMetadata) AddTopicRoutingMetadata(topic topicPartition) { 207 pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path)) 208} 209 210func (pm pubsubMetadata) AddSubscriptionRoutingMetadata(subscription subscriptionPartition) { 211 pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path)) 212} 213 214func (pm pubsubMetadata) AddClientInfo(framework FrameworkType) { 215 pm.doAddClientInfo(framework, libraryVersion) 216} 217 218func (pm pubsubMetadata) doAddClientInfo(framework FrameworkType, getVersion func() (version, bool)) { 219 s := &structpb.Struct{ 220 Fields: make(map[string]*structpb.Value), 221 } 222 s.Fields[languageKey] = stringValue(languageValue) 223 if len(framework) > 0 { 224 s.Fields[frameworkKey] = stringValue(string(framework)) 225 } 226 if version, ok := getVersion(); ok { 227 s.Fields[majorVersionKey] = stringValue(version.Major) 228 s.Fields[minorVersionKey] = stringValue(version.Minor) 229 } 230 if bytes, err := proto.Marshal(s); err == nil { 231 pm[clientInfoMetadataHeader] = base64.StdEncoding.EncodeToString(bytes) 232 } 233} 234 235func (pm pubsubMetadata) AddToContext(ctx context.Context) context.Context { 236 md, _ := metadata.FromOutgoingContext(ctx) 237 md = md.Copy() 238 for key, val := range pm { 239 md[key] = append(md[key], val) 240 } 241 return metadata.NewOutgoingContext(ctx, md) 242} 243