1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"fmt"
22	"log"
23	"os"
24	"regexp"
25	"time"
26
27	"cloud.google.com/go/internal/trace"
28	vkit "cloud.google.com/go/spanner/apiv1"
29	"google.golang.org/api/option"
30	"google.golang.org/api/option/internaloption"
31	gtransport "google.golang.org/api/transport/grpc"
32	sppb "google.golang.org/genproto/googleapis/spanner/v1"
33	"google.golang.org/grpc"
34	"google.golang.org/grpc/codes"
35	"google.golang.org/grpc/metadata"
36)
37
38const (
39	endpoint = "spanner.googleapis.com:443"
40
41	// resourcePrefixHeader is the name of the metadata header used to indicate
42	// the resource being operated on.
43	resourcePrefixHeader = "google-cloud-resource-prefix"
44
45	// numChannels is the default value for NumChannels of client.
46	numChannels = 4
47)
48
49const (
50	// Scope is the scope for Cloud Spanner Data API.
51	Scope = "https://www.googleapis.com/auth/spanner.data"
52
53	// AdminScope is the scope for Cloud Spanner Admin APIs.
54	AdminScope = "https://www.googleapis.com/auth/spanner.admin"
55)
56
57var (
58	validDBPattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)/databases/(?P<database>[^/]+)$")
59)
60
61func validDatabaseName(db string) error {
62	if matched := validDBPattern.MatchString(db); !matched {
63		return fmt.Errorf("database name %q should conform to pattern %q",
64			db, validDBPattern.String())
65	}
66	return nil
67}
68
69func parseDatabaseName(db string) (project, instance, database string, err error) {
70	matches := validDBPattern.FindStringSubmatch(db)
71	if len(matches) == 0 {
72		return "", "", "", fmt.Errorf("Failed to parse database name from %q according to pattern %q",
73			db, validDBPattern.String())
74	}
75	return matches[1], matches[2], matches[3], nil
76}
77
78// Client is a client for reading and writing data to a Cloud Spanner database.
79// A client is safe to use concurrently, except for its Close method.
80type Client struct {
81	sc           *sessionClient
82	idleSessions *sessionPool
83	logger       *log.Logger
84	qo           QueryOptions
85}
86
87// ClientConfig has configurations for the client.
88type ClientConfig struct {
89	// NumChannels is the number of gRPC channels.
90	// If zero, a reasonable default is used based on the execution environment.
91	//
92	// Deprecated: The Spanner client now uses a pool of gRPC connections. Use
93	// option.WithGRPCConnectionPool(numConns) instead to specify the number of
94	// connections the client should use. The client will default to a
95	// reasonable default if this option is not specified.
96	NumChannels int
97
98	// SessionPoolConfig is the configuration for session pool.
99	SessionPoolConfig
100
101	// SessionLabels for the sessions created by this client.
102	// See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session
103	// for more info.
104	SessionLabels map[string]string
105
106	// QueryOptions is the configuration for executing a sql query.
107	QueryOptions QueryOptions
108
109	// CallOptions is the configuration for providing custom retry settings that
110	// override the default values.
111	CallOptions *vkit.CallOptions
112
113	// logger is the logger to use for this client. If it is nil, all logging
114	// will be directed to the standard logger.
115	logger *log.Logger
116}
117
118// errDial returns error for dialing to Cloud Spanner.
119func errDial(ci int, err error) error {
120	e := toSpannerError(err).(*Error)
121	e.decorate(fmt.Sprintf("dialing fails for channel[%v]", ci))
122	return e
123}
124
125func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
126	existing, ok := metadata.FromOutgoingContext(ctx)
127	if ok {
128		md = metadata.Join(existing, md)
129	}
130	return metadata.NewOutgoingContext(ctx, md)
131}
132
133// NewClient creates a client to a database. A valid database name has the
134// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
135// a default configuration.
136func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
137	return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...)
138}
139
140// NewClientWithConfig creates a client to a database. A valid database name has
141// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
142func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
143	// Validate database path.
144	if err := validDatabaseName(database); err != nil {
145		return nil, err
146	}
147
148	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
149	defer func() { trace.EndSpan(ctx, err) }()
150
151	// Append emulator options if SPANNER_EMULATOR_HOST has been set.
152	if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
153		emulatorOpts := []option.ClientOption{
154			option.WithEndpoint(emulatorAddr),
155			option.WithGRPCDialOption(grpc.WithInsecure()),
156			option.WithoutAuthentication(),
157			internaloption.SkipDialSettingsValidation(),
158		}
159		opts = append(emulatorOpts, opts...)
160	}
161
162	// Prepare gRPC channels.
163	hasNumChannelsConfig := config.NumChannels > 0
164	if config.NumChannels == 0 {
165		config.NumChannels = numChannels
166	}
167	// gRPC options.
168	allOpts := []option.ClientOption{
169		option.WithEndpoint(endpoint),
170		option.WithScopes(Scope),
171		option.WithGRPCDialOption(
172			grpc.WithDefaultCallOptions(
173				grpc.MaxCallSendMsgSize(100<<20),
174				grpc.MaxCallRecvMsgSize(100<<20),
175			),
176		),
177		option.WithGRPCConnectionPool(config.NumChannels),
178	}
179	// opts will take precedence above allOpts, as the values in opts will be
180	// applied after the values in allOpts.
181	allOpts = append(allOpts, opts...)
182	pool, err := gtransport.DialPool(ctx, allOpts...)
183	if err != nil {
184		return nil, err
185	}
186	if hasNumChannelsConfig && pool.Num() != config.NumChannels {
187		pool.Close()
188		return nil, spannerErrorf(codes.InvalidArgument, "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value.", config.NumChannels, pool.Num())
189	}
190
191	// TODO(loite): Remove as the original map cannot be changed by the user
192	// anyways, and the client library is also not changing it.
193	// Make a copy of labels.
194	sessionLabels := make(map[string]string)
195	for k, v := range config.SessionLabels {
196		sessionLabels[k] = v
197	}
198
199	// Default configs for session pool.
200	if config.MaxOpened == 0 {
201		config.MaxOpened = uint64(pool.Num() * 100)
202	}
203	if config.MaxBurst == 0 {
204		config.MaxBurst = DefaultSessionPoolConfig.MaxBurst
205	}
206	if config.incStep == 0 {
207		config.incStep = DefaultSessionPoolConfig.incStep
208	}
209	// Create a session client.
210	sc := newSessionClient(pool, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database), config.logger, config.CallOptions)
211	// Create a session pool.
212	config.SessionPoolConfig.sessionLabels = sessionLabels
213	sp, err := newSessionPool(sc, config.SessionPoolConfig)
214	if err != nil {
215		sc.close()
216		return nil, err
217	}
218	c = &Client{
219		sc:           sc,
220		idleSessions: sp,
221		logger:       config.logger,
222		qo:           getQueryOptions(config.QueryOptions),
223	}
224	return c, nil
225}
226
227// getQueryOptions returns the query options overwritten by the environment
228// variables if exist. The input parameter is the query options set by users
229// via application-level configuration. If the environment variables are set,
230// this will return the overwritten query options.
231func getQueryOptions(opts QueryOptions) QueryOptions {
232	opv := os.Getenv("SPANNER_OPTIMIZER_VERSION")
233	if opv != "" {
234		if opts.Options == nil {
235			opts.Options = &sppb.ExecuteSqlRequest_QueryOptions{}
236		}
237		opts.Options.OptimizerVersion = opv
238	}
239	return opts
240}
241
242// Close closes the client.
243func (c *Client) Close() {
244	if c.idleSessions != nil {
245		c.idleSessions.close()
246	}
247	c.sc.close()
248}
249
250// Single provides a read-only snapshot transaction optimized for the case
251// where only a single read or query is needed.  This is more efficient than
252// using ReadOnlyTransaction() for a single read or query.
253//
254// Single will use a strong TimestampBound by default. Use
255// ReadOnlyTransaction.WithTimestampBound to specify a different
256// TimestampBound. A non-strong bound can be used to reduce latency, or
257// "time-travel" to prior versions of the database, see the documentation of
258// TimestampBound for details.
259func (c *Client) Single() *ReadOnlyTransaction {
260	t := &ReadOnlyTransaction{singleUse: true}
261	t.txReadOnly.sp = c.idleSessions
262	t.txReadOnly.txReadEnv = t
263	t.txReadOnly.qo = c.qo
264	t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error {
265		if t.sh == nil {
266			return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction")
267		}
268		// Remove the session that returned 'Session not found' from the pool.
269		t.sh.destroy()
270		// Reset the transaction, acquire a new session and retry.
271		t.state = txNew
272		sh, _, err := t.acquire(ctx)
273		if err != nil {
274			return err
275		}
276		t.sh = sh
277		return nil
278	}
279	return t
280}
281
282// ReadOnlyTransaction returns a ReadOnlyTransaction that can be used for
283// multiple reads from the database.  You must call Close() when the
284// ReadOnlyTransaction is no longer needed to release resources on the server.
285//
286// ReadOnlyTransaction will use a strong TimestampBound by default.  Use
287// ReadOnlyTransaction.WithTimestampBound to specify a different
288// TimestampBound.  A non-strong bound can be used to reduce latency, or
289// "time-travel" to prior versions of the database, see the documentation of
290// TimestampBound for details.
291func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
292	t := &ReadOnlyTransaction{
293		singleUse:       false,
294		txReadyOrClosed: make(chan struct{}),
295	}
296	t.txReadOnly.sp = c.idleSessions
297	t.txReadOnly.txReadEnv = t
298	t.txReadOnly.qo = c.qo
299	return t
300}
301
302// BatchReadOnlyTransaction returns a BatchReadOnlyTransaction that can be used
303// for partitioned reads or queries from a snapshot of the database. This is
304// useful in batch processing pipelines where one wants to divide the work of
305// reading from the database across multiple machines.
306//
307// Note: This transaction does not use the underlying session pool but creates a
308// new session each time, and the session is reused across clients.
309//
310// You should call Close() after the txn is no longer needed on local
311// client, and call Cleanup() when the txn is finished for all clients, to free
312// the session.
313func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound) (*BatchReadOnlyTransaction, error) {
314	var (
315		tx  transactionID
316		rts time.Time
317		s   *session
318		sh  *sessionHandle
319		err error
320	)
321	defer func() {
322		if err != nil && sh != nil {
323			s.delete(ctx)
324		}
325	}()
326
327	// Create session.
328	s, err = c.sc.createSession(ctx)
329	if err != nil {
330		return nil, err
331	}
332	sh = &sessionHandle{session: s}
333
334	// Begin transaction.
335	res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
336		Session: sh.getID(),
337		Options: &sppb.TransactionOptions{
338			Mode: &sppb.TransactionOptions_ReadOnly_{
339				ReadOnly: buildTransactionOptionsReadOnly(tb, true),
340			},
341		},
342	})
343	if err != nil {
344		return nil, toSpannerError(err)
345	}
346	tx = res.Id
347	if res.ReadTimestamp != nil {
348		rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
349	}
350
351	t := &BatchReadOnlyTransaction{
352		ReadOnlyTransaction: ReadOnlyTransaction{
353			tx:              tx,
354			txReadyOrClosed: make(chan struct{}),
355			state:           txActive,
356			rts:             rts,
357		},
358		ID: BatchReadOnlyTransactionID{
359			tid: tx,
360			sid: sh.getID(),
361			rts: rts,
362		},
363	}
364	t.txReadOnly.sh = sh
365	t.txReadOnly.txReadEnv = t
366	t.txReadOnly.qo = c.qo
367	return t, nil
368}
369
370// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from
371// BatchReadOnlyTransactionID
372func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction {
373	s, err := c.sc.sessionWithID(tid.sid)
374	if err != nil {
375		logf(c.logger, "unexpected error: %v\nThis is an indication of an internal error in the Spanner client library.", err)
376		// Use an invalid session. Preferably, this method should just return
377		// the error instead of this, but that would mean an API change.
378		s = &session{}
379	}
380	sh := &sessionHandle{session: s}
381
382	t := &BatchReadOnlyTransaction{
383		ReadOnlyTransaction: ReadOnlyTransaction{
384			tx:              tid.tid,
385			txReadyOrClosed: make(chan struct{}),
386			state:           txActive,
387			rts:             tid.rts,
388		},
389		ID: tid,
390	}
391	t.txReadOnly.sh = sh
392	t.txReadOnly.txReadEnv = t
393	t.txReadOnly.qo = c.qo
394	return t
395}
396
397type transactionInProgressKey struct{}
398
399func checkNestedTxn(ctx context.Context) error {
400	if ctx.Value(transactionInProgressKey{}) != nil {
401		return spannerErrorf(codes.FailedPrecondition, "Cloud Spanner does not support nested transactions")
402	}
403	return nil
404}
405
406// ReadWriteTransaction executes a read-write transaction, with retries as
407// necessary.
408//
409// The function f will be called one or more times. It must not maintain
410// any state between calls.
411//
412// If the transaction cannot be committed or if f returns an ABORTED error,
413// ReadWriteTransaction will call f again. It will continue to call f until the
414// transaction can be committed or the Context times out or is cancelled.  If f
415// returns an error other than ABORTED, ReadWriteTransaction will abort the
416// transaction and return the error.
417//
418// To limit the number of retries, set a deadline on the Context rather than
419// using a fixed limit on the number of attempts. ReadWriteTransaction will
420// retry as needed until that deadline is met.
421//
422// See https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction for
423// more details.
424func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (commitTimestamp time.Time, err error) {
425	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.ReadWriteTransaction")
426	defer func() { trace.EndSpan(ctx, err) }()
427	if err := checkNestedTxn(ctx); err != nil {
428		return time.Time{}, err
429	}
430	var (
431		ts time.Time
432		sh *sessionHandle
433	)
434	err = runWithRetryOnAbortedOrSessionNotFound(ctx, func(ctx context.Context) error {
435		var (
436			err error
437			t   *ReadWriteTransaction
438		)
439		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
440			// Session handle hasn't been allocated or has been destroyed.
441			sh, err = c.idleSessions.takeWriteSession(ctx)
442			if err != nil {
443				// If session retrieval fails, just fail the transaction.
444				return err
445			}
446			t = &ReadWriteTransaction{
447				tx: sh.getTransactionID(),
448			}
449		} else {
450			t = &ReadWriteTransaction{}
451		}
452		t.txReadOnly.sh = sh
453		t.txReadOnly.txReadEnv = t
454		t.txReadOnly.qo = c.qo
455		trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
456			"Starting transaction attempt")
457		if err = t.begin(ctx); err != nil {
458			return err
459		}
460		ts, err = t.runInTransaction(ctx, f)
461		return err
462	})
463	if sh != nil {
464		sh.recycle()
465	}
466	return ts, err
467}
468
469// applyOption controls the behavior of Client.Apply.
470type applyOption struct {
471	// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
472	// Spanner at least once.
473	atLeastOnce bool
474}
475
476// An ApplyOption is an optional argument to Apply.
477type ApplyOption func(*applyOption)
478
479// ApplyAtLeastOnce returns an ApplyOption that removes replay protection.
480//
481// With this option, Apply may attempt to apply mutations more than once; if
482// the mutations are not idempotent, this may lead to a failure being reported
483// when the mutation was applied more than once. For example, an insert may
484// fail with ALREADY_EXISTS even though the row did not exist before Apply was
485// called. For this reason, most users of the library will prefer not to use
486// this option.  However, ApplyAtLeastOnce requires only a single RPC, whereas
487// Apply's default replay protection may require an additional RPC.  So this
488// option may be appropriate for latency sensitive and/or high throughput blind
489// writing.
490func ApplyAtLeastOnce() ApplyOption {
491	return func(ao *applyOption) {
492		ao.atLeastOnce = true
493	}
494}
495
496// Apply applies a list of mutations atomically to the database.
497func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
498	ao := &applyOption{}
499	for _, opt := range opts {
500		opt(ao)
501	}
502
503	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Apply")
504	defer func() { trace.EndSpan(ctx, err) }()
505
506	if !ao.atLeastOnce {
507		return c.ReadWriteTransaction(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
508			return t.BufferWrite(ms)
509		})
510	}
511	t := &writeOnlyTransaction{c.idleSessions}
512	return t.applyAtLeastOnce(ctx, ms...)
513}
514
515// logf logs the given message to the given logger, or the standard logger if
516// the given logger is nil.
517func logf(logger *log.Logger, format string, v ...interface{}) {
518	if logger == nil {
519		log.Printf(format, v...)
520	} else {
521		logger.Printf(format, v...)
522	}
523}
524