1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"fmt"
22	"regexp"
23	"sync/atomic"
24	"time"
25
26	"cloud.google.com/go/internal/trace"
27	"cloud.google.com/go/internal/version"
28	"google.golang.org/api/option"
29	gtransport "google.golang.org/api/transport/grpc"
30	sppb "google.golang.org/genproto/googleapis/spanner/v1"
31	"google.golang.org/grpc"
32	"google.golang.org/grpc/codes"
33	"google.golang.org/grpc/metadata"
34)
35
36const (
37	endpoint = "spanner.googleapis.com:443"
38
39	// resourcePrefixHeader is the name of the metadata header used to indicate
40	// the resource being operated on.
41	resourcePrefixHeader = "google-cloud-resource-prefix"
42
43	// xGoogHeaderKey is the name of the metadata header used to indicate client
44	// information.
45	xGoogHeaderKey = "x-goog-api-client"
46)
47
48const (
49	// Scope is the scope for Cloud Spanner Data API.
50	Scope = "https://www.googleapis.com/auth/spanner.data"
51
52	// AdminScope is the scope for Cloud Spanner Admin APIs.
53	AdminScope = "https://www.googleapis.com/auth/spanner.admin"
54)
55
56var (
57	validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$")
58	xGoogHeaderVal = fmt.Sprintf("gl-go/%s gccl/%s grpc/%s", version.Go(), version.Repo, grpc.Version)
59)
60
61func validDatabaseName(db string) error {
62	if matched := validDBPattern.MatchString(db); !matched {
63		return fmt.Errorf("database name %q should conform to pattern %q",
64			db, validDBPattern.String())
65	}
66	return nil
67}
68
69// Client is a client for reading and writing data to a Cloud Spanner database.
70// A client is safe to use concurrently, except for its Close method.
71type Client struct {
72	// rr must be accessed through atomic operations.
73	rr      uint32
74	conns   []*grpc.ClientConn
75	clients []sppb.SpannerClient
76
77	database string
78	// Metadata to be sent with each request.
79	md           metadata.MD
80	idleSessions *sessionPool
81	// sessionLabels for the sessions created by this client.
82	sessionLabels map[string]string
83}
84
85// ClientConfig has configurations for the client.
86type ClientConfig struct {
87	// NumChannels is the number of gRPC channels.
88	// If zero, a reasonable default is used based on the execution environment.
89	NumChannels int
90
91	// SessionPoolConfig is the configuration for session pool.
92	SessionPoolConfig
93
94	// SessionLabels for the sessions created by this client.
95	// See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session
96	// for more info.
97	SessionLabels map[string]string
98}
99
100// errDial returns error for dialing to Cloud Spanner.
101func errDial(ci int, err error) error {
102	e := toSpannerError(err).(*Error)
103	e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci))
104	return e
105}
106
107func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
108	existing, ok := metadata.FromOutgoingContext(ctx)
109	if ok {
110		md = metadata.Join(existing, md)
111	}
112	return metadata.NewOutgoingContext(ctx, md)
113}
114
115// NewClient creates a client to a database. A valid database name has the
116// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
117// a default configuration.
118func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
119	return NewClientWithConfig(ctx, database, ClientConfig{}, opts...)
120}
121
122// NewClientWithConfig creates a client to a database. A valid database name has
123// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
124func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
125	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
126	defer func() { trace.EndSpan(ctx, err) }()
127
128	// Validate database path.
129	if err := validDatabaseName(database); err != nil {
130		return nil, err
131	}
132	c = &Client{
133		database: database,
134		md: metadata.Pairs(
135			resourcePrefixHeader, database,
136			xGoogHeaderKey, xGoogHeaderVal),
137	}
138
139	// Make a copy of labels.
140	c.sessionLabels = make(map[string]string)
141	for k, v := range config.SessionLabels {
142		c.sessionLabels[k] = v
143	}
144
145	// gRPC options.
146	allOpts := []option.ClientOption{
147		option.WithEndpoint(endpoint),
148		option.WithScopes(Scope),
149		option.WithGRPCDialOption(
150			grpc.WithDefaultCallOptions(
151				grpc.MaxCallSendMsgSize(100<<20),
152				grpc.MaxCallRecvMsgSize(100<<20),
153			),
154		),
155	}
156	allOpts = append(allOpts, opts...)
157
158	// Prepare gRPC channels.
159	if config.NumChannels == 0 {
160		config.NumChannels = numChannels
161	}
162
163	// Default configs for session pool.
164	if config.MaxOpened == 0 {
165		config.MaxOpened = uint64(config.NumChannels * 100)
166	}
167	if config.MaxBurst == 0 {
168		config.MaxBurst = 10
169	}
170
171	// TODO(deklerk): This should be replaced with a balancer with
172	// config.NumChannels connections, instead of config.NumChannels
173	// clientconns.
174	for i := 0; i < config.NumChannels; i++ {
175		conn, err := gtransport.Dial(ctx, allOpts...)
176		if err != nil {
177			return nil, errDial(i, err)
178		}
179		c.conns = append(c.conns, conn)
180		c.clients = append(c.clients, sppb.NewSpannerClient(conn))
181	}
182
183	// Prepare session pool.
184	config.SessionPoolConfig.getRPCClient = func() (sppb.SpannerClient, error) {
185		// TODO: support more loadbalancing options.
186		return c.rrNext(), nil
187	}
188	config.SessionPoolConfig.sessionLabels = c.sessionLabels
189	sp, err := newSessionPool(database, config.SessionPoolConfig, c.md)
190	if err != nil {
191		c.Close()
192		return nil, err
193	}
194	c.idleSessions = sp
195	return c, nil
196}
197
198// rrNext returns the next available Cloud Spanner RPC client in a round-robin
199// manner.
200func (c *Client) rrNext() sppb.SpannerClient {
201	return c.clients[atomic.AddUint32(&c.rr, 1)%uint32(len(c.clients))]
202}
203
204// Close closes the client.
205func (c *Client) Close() {
206	if c.idleSessions != nil {
207		c.idleSessions.close()
208	}
209	for _, conn := range c.conns {
210		conn.Close()
211	}
212}
213
214// Single provides a read-only snapshot transaction optimized for the case
215// where only a single read or query is needed.  This is more efficient than
216// using ReadOnlyTransaction() for a single read or query.
217//
218// Single will use a strong TimestampBound by default. Use
219// ReadOnlyTransaction.WithTimestampBound to specify a different
220// TimestampBound. A non-strong bound can be used to reduce latency, or
221// "time-travel" to prior versions of the database, see the documentation of
222// TimestampBound for details.
223func (c *Client) Single() *ReadOnlyTransaction {
224	t := &ReadOnlyTransaction{singleUse: true, sp: c.idleSessions}
225	t.txReadOnly.txReadEnv = t
226	return t
227}
228
229// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for
230// multiple reads from the database.  You must call Close() when the
231// ReadOnlyTransaction is no longer needed to release resources on the server.
232//
233// ReadOnlyTransaction will use a strong TimestampBound by default.  Use
234// ReadOnlyTransaction.WithTimestampBound to specify a different
235// TimestampBound.  A non-strong bound can be used to reduce latency, or
236// "time-travel" to prior versions of the database, see the documentation of
237// TimestampBound for details.
238func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
239	t := &ReadOnlyTransaction{
240		singleUse:       false,
241		sp:              c.idleSessions,
242		txReadyOrClosed: make(chan struct{}),
243	}
244	t.txReadOnly.txReadEnv = t
245	return t
246}
247
248// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used
249// for partitioned reads or queries from a snapshot of the database. This is
250// useful in batch processing pipelines where one wants to divide the work of
251// reading from the database across multiple machines.
252//
253// Note: This transaction does not use the underlying session pool but creates a
254// new session each time, and the session is reused across clients.
255//
256// You should call Close() after the txn is no longer needed on local
257// client, and call Cleanup() when the txn is finished for all clients, to free
258// the session.
259func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) {
260	var (
261		tx  transactionID
262		rts time.Time
263		s   *session
264		sh  *sessionHandle
265		err error
266	)
267	defer func() {
268		if err != nil && sh != nil {
269			s.delete(ctx)
270		}
271	}()
272
273	// Create session.
274	sc := c.rrNext()
275	s, err = createSession(ctx, sc, c.database, c.sessionLabels, c.md)
276	if err != nil {
277		return nil, err
278	}
279	sh = &sessionHandle{session: s}
280
281	// Begin transaction.
282	err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error {
283		res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
284			Session: sh.getID(),
285			Options: &sppb.TransactionOptions{
286				Mode: &sppb.TransactionOptions_ReadOnly_{
287					ReadOnly: buildTransactionOptionsReadOnly(tb, true),
288				},
289			},
290		})
291		if e != nil {
292			return e
293		}
294		tx = res.Id
295		if res.ReadTimestamp != nil {
296			rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
297		}
298		return nil
299	})
300	if err != nil {
301		return nil, err
302	}
303
304	t := &BatchReadOnlyTransaction{
305		ReadOnlyTransaction: ReadOnlyTransaction{
306			tx:              tx,
307			txReadyOrClosed: make(chan struct{}),
308			state:           txActive,
309			sh:              sh,
310			rts:             rts,
311		},
312		ID: BatchReadOnlyTransactionID{
313			tid: tx,
314			sid: sh.getID(),
315			rts: rts,
316		},
317	}
318	t.txReadOnly.txReadEnv = t
319	return t, nil
320}
321
322// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from
323// BatchReadOnlyTransactionID
324func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction {
325	sc := c.rrNext()
326	s := &session{valid: true, client: sc, id: tid.sid, createTime: time.Now(), md: c.md}
327	sh := &sessionHandle{session: s}
328
329	t := &BatchReadOnlyTransaction{
330		ReadOnlyTransaction: ReadOnlyTransaction{
331			tx:              tid.tid,
332			txReadyOrClosed: make(chan struct{}),
333			state:           txActive,
334			sh:              sh,
335			rts:             tid.rts,
336		},
337		ID: tid,
338	}
339	t.txReadOnly.txReadEnv = t
340	return t
341}
342
343type transactionInProgressKey struct{}
344
345func checkNestedTxn(ctx context.Context) error {
346	if ctx.Value(transactionInProgressKey{}) != nil {
347		return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions")
348	}
349	return nil
350}
351
352// ReadWriteTransaction executes a read-write transaction, with retries as
353// necessary.
354//
355// The function f will be called one or more times. It must not maintain
356// any state between calls.
357//
358// If the transaction cannot be committed or if f returns an ABORTED error,
359// ReadWriteTransaction will call f again. It will continue to call f until the
360// transaction can be committed or the Context times out or is cancelled.  If f
361// returns an error other than ABORTED, ReadWriteTransaction will abort the
362// transaction and return the error.
363//
364// To limit the number of retries, set a deadline on the Context rather than
365// using a fixed limit on the number of attempts. ReadWriteTransaction will
366// retry as needed until that deadline is met.
367//
368// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
369// more details.
370func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
371	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
372	defer func() { trace.EndSpan(ctx, err) }()
373	if err := checkNestedTxn(ctx); err != nil {
374		return time.Time{}, err
375	}
376	var (
377		ts time.Time
378		sh *sessionHandle
379	)
380	err = runRetryableNoWrap(ctx, func(ctx context.Context) error {
381		var (
382			err error
383			t   *ReadWriteTransaction
384		)
385		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
386			// Session handle hasn't been allocated or has been destroyed.
387			sh, err = c.idleSessions.takeWriteSession(ctx)
388			if err != nil {
389				// If session retrieval fails, just fail the transaction.
390				return err
391			}
392			t = &ReadWriteTransaction{
393				sh: sh,
394				tx: sh.getTransactionID(),
395			}
396		} else {
397			t = &ReadWriteTransaction{
398				sh: sh,
399			}
400		}
401		t.txReadOnly.txReadEnv = t
402		trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
403			"Starting transaction attempt")
404		if err = t.begin(ctx); err != nil {
405			// Mask error from begin operation as retryable error.
406			return errRetry(err)
407		}
408		ts, err = t.runInTransaction(ctx, f)
409		return err
410	})
411	if sh != nil {
412		sh.recycle()
413	}
414	return ts, err
415}
416
417// applyOption controls the behavior of Client.Apply.
418type applyOption struct {
419	// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
420	// Spanner at least once.
421	atLeastOnce bool
422}
423
424// An ApplyOption is an optional argument to Apply.
425type ApplyOption func(*applyOption)
426
427// ApplyAtLeastOnce returns an ApplyOption that removes replay protection.
428//
429// With this option, Apply may attempt to apply mutations more than once; if
430// the mutations are not idempotent, this may lead to a failure being reported
431// when the mutation was applied more than once. For example, an insert may
432// fail with ALREADY_EXISTS even though the row did not exist before Apply was
433// called. For this reason, most users of the library will prefer not to use
434// this option.  However, ApplyAtLeastOnce requires only a single RPC, whereas
435// Apply's default replay protection may require an additional RPC.  So this
436// option may be appropriate for latency sensitive and/or high throughput blind
437// writing.
438func ApplyAtLeastOnce() ApplyOption {
439	return func(ao *applyOption) {
440		ao.atLeastOnce = true
441	}
442}
443
444// Apply applies a list of mutations atomically to the database.
445func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
446	ao := &applyOption{}
447	for _, opt := range opts {
448		opt(ao)
449	}
450	if !ao.atLeastOnce {
451		return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
452			return t.BufferWrite(ms)
453		})
454	}
455
456	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply")
457	defer func() { trace.EndSpan(ctx, err) }()
458	t := &writeOnlyTransaction{c.idleSessions}
459	return t.applyAtLeastOnce(ctx, ms...)
460}
461