1// Copyright 2017 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package datastore
16
17import (
18	"context"
19	"fmt"
20	"time"
21
22	"cloud.google.com/go/internal"
23	"cloud.google.com/go/internal/trace"
24	"cloud.google.com/go/internal/version"
25	gax "github.com/googleapis/gax-go/v2"
26	pb "google.golang.org/genproto/googleapis/datastore/v1"
27	"google.golang.org/grpc"
28	"google.golang.org/grpc/codes"
29	"google.golang.org/grpc/metadata"
30	"google.golang.org/grpc/status"
31)
32
33// datastoreClient is a wrapper for the pb.DatastoreClient that includes gRPC
34// metadata to be sent in each request for server-side traffic management.
35type datastoreClient struct {
36	// Embed so we still implement the DatastoreClient interface,
37	// if the interface adds more methods.
38	pb.DatastoreClient
39
40	c  pb.DatastoreClient
41	md metadata.MD
42}
43
44func newDatastoreClient(conn grpc.ClientConnInterface, projectID string) pb.DatastoreClient {
45	return &datastoreClient{
46		c: pb.NewDatastoreClient(conn),
47		md: metadata.Pairs(
48			resourcePrefixHeader, "projects/"+projectID,
49			"x-goog-api-client", fmt.Sprintf("gl-go/%s gccl/%s grpc/", version.Go(), version.Repo)),
50	}
51}
52
53func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (res *pb.LookupResponse, err error) {
54	ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Lookup")
55	defer func() { trace.EndSpan(ctx, err) }()
56
57	err = dc.invoke(ctx, func(ctx context.Context) error {
58		res, err = dc.c.Lookup(ctx, in, opts...)
59		return err
60	})
61	return res, err
62}
63
64func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (res *pb.RunQueryResponse, err error) {
65	ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.RunQuery")
66	defer func() { trace.EndSpan(ctx, err) }()
67
68	err = dc.invoke(ctx, func(ctx context.Context) error {
69		res, err = dc.c.RunQuery(ctx, in, opts...)
70		return err
71	})
72	return res, err
73}
74
75func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (res *pb.BeginTransactionResponse, err error) {
76	ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.BeginTransaction")
77	defer func() { trace.EndSpan(ctx, err) }()
78
79	err = dc.invoke(ctx, func(ctx context.Context) error {
80		res, err = dc.c.BeginTransaction(ctx, in, opts...)
81		return err
82	})
83	return res, err
84}
85
86func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (res *pb.CommitResponse, err error) {
87	ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Commit")
88	defer func() { trace.EndSpan(ctx, err) }()
89
90	err = dc.invoke(ctx, func(ctx context.Context) error {
91		res, err = dc.c.Commit(ctx, in, opts...)
92		return err
93	})
94	return res, err
95}
96
97func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (res *pb.RollbackResponse, err error) {
98	ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.Rollback")
99	defer func() { trace.EndSpan(ctx, err) }()
100
101	err = dc.invoke(ctx, func(ctx context.Context) error {
102		res, err = dc.c.Rollback(ctx, in, opts...)
103		return err
104	})
105	return res, err
106}
107
108func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (res *pb.AllocateIdsResponse, err error) {
109	ctx = trace.StartSpan(ctx, "cloud.google.com/go/datastore.datastoreClient.AllocateIds")
110	defer func() { trace.EndSpan(ctx, err) }()
111
112	err = dc.invoke(ctx, func(ctx context.Context) error {
113		res, err = dc.c.AllocateIds(ctx, in, opts...)
114		return err
115	})
116	return res, err
117}
118
119func (dc *datastoreClient) invoke(ctx context.Context, f func(ctx context.Context) error) error {
120	ctx = metadata.NewOutgoingContext(ctx, dc.md)
121	return internal.Retry(ctx, gax.Backoff{Initial: 100 * time.Millisecond}, func() (stop bool, err error) {
122		err = f(ctx)
123		return !shouldRetry(err), err
124	})
125}
126
127func shouldRetry(err error) bool {
128	if err == nil {
129		return false
130	}
131	s, ok := status.FromError(err)
132	if !ok {
133		return false
134	}
135	// Only retry on UNAVAILABLE as per https://aip.dev/194. Other errors from
136	// https://cloud.google.com/datastore/docs/concepts/errors may be retried
137	// by the user if desired, but are not retried by the clientg.
138	return s.Code() == codes.Unavailable
139}
140