1// Copyright 2017 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package datastore 16 17import ( 18 "context" 19 "fmt" 20 "time" 21 22 "cloud.google.com/go/internal" 23 "cloud.google.com/go/internal/trace" 24 "cloud.google.com/go/internal/version" 25 gax "github.com/googleapis/gax-go/v2" 26 pb "google.golang.org/genproto/googleapis/datastore/v1" 27 "google.golang.org/grpc" 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/metadata" 30 "google.golang.org/grpc/status" 31) 32 33// datastoreClient is a wrapper for the pb.DatastoreClient that includes gRPC 34// metadata to be sent in each request for server-side traffic management. 35type datastoreClient struct { 36 // Embed so we still implement the DatastoreClient interface, 37 // if the interface adds more methods. 38 pb.DatastoreClient 39 40 c pb.DatastoreClient 41 md metadata.MD 42} 43 44func newDatastoreClient(conn grpc.ClientConnInterface, projectID string) pb.DatastoreClient { 45 return &datastoreClient{ 46 c: pb.NewDatastoreClient(conn), 47 md: metadata.Pairs( 48 resourcePrefixHeader, "projects/"+projectID, 49 "x-goog-api-client", fmt.Sprintf("gl-go/%s gccl/%s grpc/", version.Go(), version.Repo)), 50 } 51} 52 53func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (res *pb.LookupResponse, err error) { 54 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Lookup") 55 defer func() { trace.EndSpan(ctx, err) }() 56 57 err = dc.invoke(ctx, func(ctx context.Context) error { 58 res, err = dc.c.Lookup(ctx, in, opts...) 59 return err 60 }) 61 return res, err 62} 63 64func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (res *pb.RunQueryResponse, err error) { 65 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.RunQuery") 66 defer func() { trace.EndSpan(ctx, err) }() 67 68 err = dc.invoke(ctx, func(ctx context.Context) error { 69 res, err = dc.c.RunQuery(ctx, in, opts...) 70 return err 71 }) 72 return res, err 73} 74 75func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (res *pb.BeginTransactionResponse, err error) { 76 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.BeginTransaction") 77 defer func() { trace.EndSpan(ctx, err) }() 78 79 err = dc.invoke(ctx, func(ctx context.Context) error { 80 res, err = dc.c.BeginTransaction(ctx, in, opts...) 81 return err 82 }) 83 return res, err 84} 85 86func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (res *pb.CommitResponse, err error) { 87 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Commit") 88 defer func() { trace.EndSpan(ctx, err) }() 89 90 err = dc.invoke(ctx, func(ctx context.Context) error { 91 res, err = dc.c.Commit(ctx, in, opts...) 92 return err 93 }) 94 return res, err 95} 96 97func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (res *pb.RollbackResponse, err error) { 98 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Rollback") 99 defer func() { trace.EndSpan(ctx, err) }() 100 101 err = dc.invoke(ctx, func(ctx context.Context) error { 102 res, err = dc.c.Rollback(ctx, in, opts...) 103 return err 104 }) 105 return res, err 106} 107 108func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (res *pb.AllocateIdsResponse, err error) { 109 ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.AllocateIds") 110 defer func() { trace.EndSpan(ctx, err) }() 111 112 err = dc.invoke(ctx, func(ctx context.Context) error { 113 res, err = dc.c.AllocateIds(ctx, in, opts...) 114 return err 115 }) 116 return res, err 117} 118 119func (dc *datastoreClient) invoke(ctx context.Context, f func(ctx context.Context) error) error { 120 ctx = metadata.NewOutgoingContext(ctx, dc.md) 121 return internal.Retry(ctx, gax.Backoff{Initial: 100 * time.Millisecond}, func() (stop bool, err error) { 122 err = f(ctx) 123 return !shouldRetry(err), err 124 }) 125} 126 127func shouldRetry(err error) bool { 128 if err == nil { 129 return false 130 } 131 s, ok := status.FromError(err) 132 if !ok { 133 return false 134 } 135 // Only retry on UNAVAILABLE as per https://aip.dev/194. Other errors from 136 // https://cloud.google.com/datastore/docs/concepts/errors may be retried 137 // by the user if desired, but are not retried by the clientg. 138 return s.Code() == codes.Unavailable 139} 140