1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"fmt"
22	"log"
23	"os"
24	"regexp"
25	"time"
26
27	"cloud.google.com/go/internal/trace"
28	vkit "cloud.google.com/go/spanner/apiv1"
29	"google.golang.org/api/option"
30	"google.golang.org/api/option/internaloption"
31	gtransport "google.golang.org/api/transport/grpc"
32	sppb "google.golang.org/genproto/googleapis/spanner/v1"
33	"google.golang.org/grpc"
34	"google.golang.org/grpc/codes"
35	"google.golang.org/grpc/metadata"
36)
37
38const (
39	// resourcePrefixHeader is the name of the metadata header used to indicate
40	// the resource being operated on.
41	resourcePrefixHeader = "google-cloud-resource-prefix"
42
43	// numChannels is the default value for NumChannels of client.
44	numChannels = 4
45)
46
47const (
48	// Scope is the scope for Cloud Spanner Data API.
49	Scope = "https://www.googleapis.com/auth/spanner.data"
50
51	// AdminScope is the scope for Cloud Spanner Admin APIs.
52	AdminScope = "https://www.googleapis.com/auth/spanner.admin"
53)
54
55var (
56	validDBPattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)/databases/(?P<database>[^/]+)$")
57)
58
59func validDatabaseName(db string) error {
60	if matched := validDBPattern.MatchString(db); !matched {
61		return fmt.Errorf("database name %q should conform to pattern %q",
62			db, validDBPattern.String())
63	}
64	return nil
65}
66
67func parseDatabaseName(db string) (project, instance, database string, err error) {
68	matches := validDBPattern.FindStringSubmatch(db)
69	if len(matches) == 0 {
70		return "", "", "", fmt.Errorf("Failed to parse database name from %q according to pattern %q",
71			db, validDBPattern.String())
72	}
73	return matches[1], matches[2], matches[3], nil
74}
75
76// Client is a client for reading and writing data to a Cloud Spanner database.
77// A client is safe to use concurrently, except for its Close method.
78type Client struct {
79	sc           *sessionClient
80	idleSessions *sessionPool
81	logger       *log.Logger
82	qo           QueryOptions
83}
84
85// DatabaseName returns the full name of a database, e.g.,
86// "projects/spanner-cloud-test/instances/foo/databases/foodb".
87func (c *Client) DatabaseName() string {
88	return c.sc.database
89}
90
91// ClientConfig has configurations for the client.
92type ClientConfig struct {
93	// NumChannels is the number of gRPC channels.
94	// If zero, a reasonable default is used based on the execution environment.
95	//
96	// Deprecated: The Spanner client now uses a pool of gRPC connections. Use
97	// option.WithGRPCConnectionPool(numConns) instead to specify the number of
98	// connections the client should use. The client will default to a
99	// reasonable default if this option is not specified.
100	NumChannels int
101
102	// SessionPoolConfig is the configuration for session pool.
103	SessionPoolConfig
104
105	// SessionLabels for the sessions created by this client.
106	// See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session
107	// for more info.
108	SessionLabels map[string]string
109
110	// QueryOptions is the configuration for executing a sql query.
111	QueryOptions QueryOptions
112
113	// CallOptions is the configuration for providing custom retry settings that
114	// override the default values.
115	CallOptions *vkit.CallOptions
116
117	// logger is the logger to use for this client. If it is nil, all logging
118	// will be directed to the standard logger.
119	logger *log.Logger
120}
121
122func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
123	existing, ok := metadata.FromOutgoingContext(ctx)
124	if ok {
125		md = metadata.Join(existing, md)
126	}
127	return metadata.NewOutgoingContext(ctx, md)
128}
129
130// NewClient creates a client to a database. A valid database name has the
131// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
132// a default configuration.
133func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
134	return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...)
135}
136
137// NewClientWithConfig creates a client to a database. A valid database name has
138// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
139func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
140	// Validate database path.
141	if err := validDatabaseName(database); err != nil {
142		return nil, err
143	}
144
145	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
146	defer func() { trace.EndSpan(ctx, err) }()
147
148	// Append emulator options if SPANNER_EMULATOR_HOST has been set.
149	if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
150		emulatorOpts := []option.ClientOption{
151			option.WithEndpoint(emulatorAddr),
152			option.WithGRPCDialOption(grpc.WithInsecure()),
153			option.WithoutAuthentication(),
154			internaloption.SkipDialSettingsValidation(),
155		}
156		opts = append(emulatorOpts, opts...)
157	}
158
159	// Prepare gRPC channels.
160	hasNumChannelsConfig := config.NumChannels > 0
161	if config.NumChannels == 0 {
162		config.NumChannels = numChannels
163	}
164	// gRPC options.
165	allOpts := allClientOpts(config.NumChannels, opts...)
166	pool, err := gtransport.DialPool(ctx, allOpts...)
167	if err != nil {
168		return nil, err
169	}
170	if hasNumChannelsConfig && pool.Num() != config.NumChannels {
171		pool.Close()
172		return nil, spannerErrorf(codes.InvalidArgument, "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value.", config.NumChannels, pool.Num())
173	}
174
175	// TODO(loite): Remove as the original map cannot be changed by the user
176	// anyways, and the client library is also not changing it.
177	// Make a copy of labels.
178	sessionLabels := make(map[string]string)
179	for k, v := range config.SessionLabels {
180		sessionLabels[k] = v
181	}
182
183	// Default configs for session pool.
184	if config.MaxOpened == 0 {
185		config.MaxOpened = uint64(pool.Num() * 100)
186	}
187	if config.MaxBurst == 0 {
188		config.MaxBurst = DefaultSessionPoolConfig.MaxBurst
189	}
190	if config.incStep == 0 {
191		config.incStep = DefaultSessionPoolConfig.incStep
192	}
193	// Create a session client.
194	sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger, config.CallOptions)
195	// Create a session pool.
196	config.SessionPoolConfig.sessionLabels = sessionLabels
197	sp, err := newSessionPool(sc, config.SessionPoolConfig)
198	if err != nil {
199		sc.close()
200		return nil, err
201	}
202	c = &Client{
203		sc:           sc,
204		idleSessions: sp,
205		logger:       config.logger,
206		qo:           getQueryOptions(config.QueryOptions),
207	}
208	return c, nil
209}
210
211// Combines the default options from the generated client, the default options
212// of the hand-written client and the user options to one list of options.
213// Precedence: userOpts > clientDefaultOpts > generatedDefaultOpts
214func allClientOpts(numChannels int, userOpts ...option.ClientOption) []option.ClientOption {
215	generatedDefaultOpts := vkit.DefaultClientOptions()
216	clientDefaultOpts := []option.ClientOption{
217		option.WithGRPCConnectionPool(numChannels),
218		option.WithUserAgent(clientUserAgent),
219		internaloption.EnableDirectPath(true),
220	}
221	allDefaultOpts := append(generatedDefaultOpts, clientDefaultOpts...)
222	return append(allDefaultOpts, userOpts...)
223}
224
225// getQueryOptions returns the query options overwritten by the environment
226// variables if exist. The input parameter is the query options set by users
227// via application-level configuration. If the environment variables are set,
228// this will return the overwritten query options.
229func getQueryOptions(opts QueryOptions) QueryOptions {
230	if opts.Options == nil {
231		opts.Options = &sppb.ExecuteSqlRequest_QueryOptions{}
232	}
233	opv := os.Getenv("SPANNER_OPTIMIZER_VERSION")
234	if opv != "" {
235		opts.Options.OptimizerVersion = opv
236	}
237	opsp := os.Getenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE")
238	if opsp != "" {
239		opts.Options.OptimizerStatisticsPackage = opsp
240	}
241	return opts
242}
243
244// Close closes the client.
245func (c *Client) Close() {
246	if c.idleSessions != nil {
247		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
248		defer cancel()
249		c.idleSessions.close(ctx)
250	}
251	c.sc.close()
252}
253
254// Single provides a read-only snapshot transaction optimized for the case
255// where only a single read or query is needed.  This is more efficient than
256// using ReadOnlyTransaction() for a single read or query.
257//
258// Single will use a strong TimestampBound by default. Use
259// ReadOnlyTransaction.WithTimestampBound to specify a different
260// TimestampBound. A non-strong bound can be used to reduce latency, or
261// "time-travel" to prior versions of the database, see the documentation of
262// TimestampBound for details.
263func (c *Client) Single() *ReadOnlyTransaction {
264	t := &ReadOnlyTransaction{singleUse: true}
265	t.txReadOnly.sp = c.idleSessions
266	t.txReadOnly.txReadEnv = t
267	t.txReadOnly.qo = c.qo
268	t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error {
269		if t.sh == nil {
270			return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction")
271		}
272		// Remove the session that returned 'Session not found' from the pool.
273		t.sh.destroy()
274		// Reset the transaction, acquire a new session and retry.
275		t.state = txNew
276		sh, _, err := t.acquire(ctx)
277		if err != nil {
278			return err
279		}
280		t.sh = sh
281		return nil
282	}
283	return t
284}
285
286// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for
287// multiple reads from the database.  You must call Close() when the
288// ReadOnlyTransaction is no longer needed to release resources on the server.
289//
290// ReadOnlyTransaction will use a strong TimestampBound by default.  Use
291// ReadOnlyTransaction.WithTimestampBound to specify a different
292// TimestampBound.  A non-strong bound can be used to reduce latency, or
293// "time-travel" to prior versions of the database, see the documentation of
294// TimestampBound for details.
295func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
296	t := &ReadOnlyTransaction{
297		singleUse:       false,
298		txReadyOrClosed: make(chan struct{}),
299	}
300	t.txReadOnly.sp = c.idleSessions
301	t.txReadOnly.txReadEnv = t
302	t.txReadOnly.qo = c.qo
303	return t
304}
305
306// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used
307// for partitioned reads or queries from a snapshot of the database. This is
308// useful in batch processing pipelines where one wants to divide the work of
309// reading from the database across multiple machines.
310//
311// Note: This transaction does not use the underlying session pool but creates a
312// new session each time, and the session is reused across clients.
313//
314// You should call Close() after the txn is no longer needed on local
315// client, and call Cleanup() when the txn is finished for all clients, to free
316// the session.
317func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) {
318	var (
319		tx  transactionID
320		rts time.Time
321		s   *session
322		sh  *sessionHandle
323		err error
324	)
325	defer func() {
326		if err != nil && sh != nil {
327			s.delete(ctx)
328		}
329	}()
330
331	// Create session.
332	s, err = c.sc.createSession(ctx)
333	if err != nil {
334		return nil, err
335	}
336	sh = &sessionHandle{session: s}
337
338	// Begin transaction.
339	res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
340		Session: sh.getID(),
341		Options: &sppb.TransactionOptions{
342			Mode: &sppb.TransactionOptions_ReadOnly_{
343				ReadOnly: buildTransactionOptionsReadOnly(tb, true),
344			},
345		},
346	})
347	if err != nil {
348		return nil, ToSpannerError(err)
349	}
350	tx = res.Id
351	if res.ReadTimestamp != nil {
352		rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
353	}
354
355	t := &BatchReadOnlyTransaction{
356		ReadOnlyTransaction: ReadOnlyTransaction{
357			tx:              tx,
358			txReadyOrClosed: make(chan struct{}),
359			state:           txActive,
360			rts:             rts,
361		},
362		ID: BatchReadOnlyTransactionID{
363			tid: tx,
364			sid: sh.getID(),
365			rts: rts,
366		},
367	}
368	t.txReadOnly.sh = sh
369	t.txReadOnly.txReadEnv = t
370	t.txReadOnly.qo = c.qo
371	return t, nil
372}
373
374// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from
375// BatchReadOnlyTransactionID
376func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction {
377	s, err := c.sc.sessionWithID(tid.sid)
378	if err != nil {
379		logf(c.logger, "unexpected error: %v\nThis is an indication of an internal error in the Spanner client library.", err)
380		// Use an invalid session. Preferably, this method should just return
381		// the error instead of this, but that would mean an API change.
382		s = &session{}
383	}
384	sh := &sessionHandle{session: s}
385
386	t := &BatchReadOnlyTransaction{
387		ReadOnlyTransaction: ReadOnlyTransaction{
388			tx:              tid.tid,
389			txReadyOrClosed: make(chan struct{}),
390			state:           txActive,
391			rts:             tid.rts,
392		},
393		ID: tid,
394	}
395	t.txReadOnly.sh = sh
396	t.txReadOnly.txReadEnv = t
397	t.txReadOnly.qo = c.qo
398	return t
399}
400
401type transactionInProgressKey struct{}
402
403func checkNestedTxn(ctx context.Context) error {
404	if ctx.Value(transactionInProgressKey{}) != nil {
405		return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions")
406	}
407	return nil
408}
409
410// ReadWriteTransaction executes a read-write transaction, with retries as
411// necessary.
412//
413// The function f will be called one or more times. It must not maintain
414// any state between calls.
415//
416// If the transaction cannot be committed or if f returns an ABORTED error,
417// ReadWriteTransaction will call f again. It will continue to call f until the
418// transaction can be committed or the Context times out or is cancelled.  If f
419// returns an error other than ABORTED, ReadWriteTransaction will abort the
420// transaction and return the error.
421//
422// To limit the number of retries, set a deadline on the Context rather than
423// using a fixed limit on the number of attempts. ReadWriteTransaction will
424// retry as needed until that deadline is met.
425//
426// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
427// more details.
428func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
429	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
430	defer func() { trace.EndSpan(ctx, err) }()
431	resp, err := c.rwTransaction(ctx, f, TransactionOptions{})
432	return resp.CommitTs, err
433}
434
435// ReadWriteTransactionWithOptions executes a read-write transaction with
436// configurable options, with retries as necessary.
437//
438// ReadWriteTransactionWithOptions is a configurable ReadWriteTransaction.
439//
440// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
441// more details.
442func (c *Client) ReadWriteTransactionWithOptions(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) {
443	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransactionWithOptions")
444	defer func() { trace.EndSpan(ctx, err) }()
445	resp, err = c.rwTransaction(ctx, f, options)
446	return resp, err
447}
448
449func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error, options TransactionOptions) (resp CommitResponse, err error) {
450	if err := checkNestedTxn(ctx); err != nil {
451		return resp, err
452	}
453	var (
454		sh *sessionHandle
455	)
456	defer func() {
457		if sh != nil {
458			sh.recycle()
459		}
460	}()
461	err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error {
462		var (
463			err error
464			t   *ReadWriteTransaction
465		)
466		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
467			// Session handle hasn't been allocated or has been destroyed.
468			sh, err = c.idleSessions.takeWriteSession(ctx)
469			if err != nil {
470				// If session retrieval fails, just fail the transaction.
471				return err
472			}
473			t = &ReadWriteTransaction{
474				tx: sh.getTransactionID(),
475			}
476		} else {
477			t = &ReadWriteTransaction{}
478		}
479		t.txReadOnly.sh = sh
480		t.txReadOnly.txReadEnv = t
481		t.txReadOnly.qo = c.qo
482		t.txOpts = options
483
484		trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
485			"Starting transaction attempt")
486		if err = t.begin(ctx); err != nil {
487			return err
488		}
489		resp, err = t.runInTransaction(ctx, f)
490		return err
491	})
492	return resp, err
493}
494
495// applyOption controls the behavior of Client.Apply.
496type applyOption struct {
497	// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
498	// Spanner at least once.
499	atLeastOnce bool
500	// transactionTag will be included with the CommitRequest.
501	transactionTag string
502	// priority is the RPC priority that is used for the commit operation.
503	priority sppb.RequestOptions_Priority
504}
505
506// An ApplyOption is an optional argument to Apply.
507type ApplyOption func(*applyOption)
508
509// ApplyAtLeastOnce returns an ApplyOption that removes replay protection.
510//
511// With this option, Apply may attempt to apply mutations more than once; if
512// the mutations are not idempotent, this may lead to a failure being reported
513// when the mutation was applied more than once. For example, an insert may
514// fail with ALREADY_EXISTS even though the row did not exist before Apply was
515// called. For this reason, most users of the library will prefer not to use
516// this option.  However, ApplyAtLeastOnce requires only a single RPC, whereas
517// Apply's default replay protection may require an additional RPC.  So this
518// option may be appropriate for latency sensitive and/or high throughput blind
519// writing.
520func ApplyAtLeastOnce() ApplyOption {
521	return func(ao *applyOption) {
522		ao.atLeastOnce = true
523	}
524}
525
526// TransactionTag returns an ApplyOption that will include the given tag as a
527// transaction tag for a write-only transaction.
528func TransactionTag(tag string) ApplyOption {
529	return func(ao *applyOption) {
530		ao.transactionTag = tag
531	}
532}
533
534// Priority returns an ApplyOptions that sets the RPC priority to use for the
535// commit operation.
536func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
537	return func(ao *applyOption) {
538		ao.priority = priority
539	}
540}
541
542// Apply applies a list of mutations atomically to the database.
543func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
544	ao := &applyOption{}
545	for _, opt := range opts {
546		opt(ao)
547	}
548
549	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply")
550	defer func() { trace.EndSpan(ctx, err) }()
551
552	if !ao.atLeastOnce {
553		resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
554			return t.BufferWrite(ms)
555		}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag})
556		return resp.CommitTs, err
557	}
558	t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag}
559	return t.applyAtLeastOnce(ctx, ms...)
560}
561
562// logf logs the given message to the given logger, or the standard logger if
563// the given logger is nil.
564func logf(logger *log.Logger, format string, v ...interface{}) {
565	if logger == nil {
566		log.Printf(format, v...)
567	} else {
568		logger.Printf(format, v...)
569	}
570}
571